Skip to content

Commit

Permalink
Merge branch 'main' into dylan/fix_apply_topn_transpose_rule
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jun 24, 2024
2 parents b16a600 + 9c73c30 commit 2b5d95a
Show file tree
Hide file tree
Showing 62 changed files with 924 additions and 437 deletions.
599 changes: 392 additions & 207 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.9.4", features = ["nightly"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
"bzip",
"xz",
] }
auto_enums = { version = "0.8", features = ["futures03", "tokio1"] }
await-tree = "0.2.1"
aws-config = { version = "1", default-features = false, features = [
Expand Down Expand Up @@ -125,9 +131,13 @@ tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "492c244e0be91feb659c0cd48a624bbd96045a33" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "54fd72fbd1dd8c592f05eeeb79223c8a6a33c297", features = [
icelake = { git = "https://github.com/icelake-io/icelake", rev = "07d53893d7788b4e41fc11efad8a6be828405c31", features = [
"prometheus",
] }
arrow-array-iceberg = { package = "arrow-array", version = "52" }
arrow-schema-iceberg = { package = "arrow-schema", version = "52" }
arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" }
arrow-cast-iceberg = { package = "arrow-cast", version = "52" }
arrow-array = "50"
arrow-arith = "50"
arrow-cast = "50"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions e2e_test/batch/functions/at_time_zone.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,19 @@ query T
select '2022-11-06 01:00:00'::timestamp AT TIME ZONE 'us/pacific';
----
2022-11-06 09:00:00+00:00

# non-literal zone
statement ok
create table t (local timestamp, tz varchar);

statement ok
insert into t values ('2024-06-10 12:00:00', 'US/Pacific'), ('2024-06-10 13:00:00', 'Asia/Singapore');

query T
select local AT TIME ZONE tz from t order by 1;
----
2024-06-10 05:00:00+00:00
2024-06-10 19:00:00+00:00

statement ok
drop table t;
2 changes: 1 addition & 1 deletion e2e_test/iceberg/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def execute_slt(args, slt):
cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} {slt}"
print(f"Command line is [{cmd}]")
subprocess.run(cmd, shell=True, check=True)
time.sleep(30)
time.sleep(15)


def verify_result(args, verify_sql, verify_schema, verify_data):
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ create sink si_kafka_without_snapshot from t_kafka with (
snapshot = 'false',
);

sleep 1s
sleep 2s

query T
select rw_key from t_sink_text_id order by rw_key
Expand All @@ -293,4 +293,4 @@ insert into t_kafka values
(10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}');

statement ok
drop table t_sink_text_id;
drop table t_sink_text_id;
20 changes: 19 additions & 1 deletion e2e_test/source/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ mysql -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"
system ok
mysql --protocol=tcp -u root mytest -e "
DROP TABLE IF EXISTS mysql_types_test;
CREATE TABLE customers(
id BIGINT PRIMARY KEY,
modified DATETIME,
custinfo JSON
);
ALTER TABLE customers ADD INDEX zipsa( (CAST(custinfo->'zipcode' AS UNSIGNED ARRAY)) );
CREATE TABLE IF NOT EXISTS mysql_types_test(
c_boolean boolean,
c_bit bit,
Expand Down Expand Up @@ -34,7 +40,6 @@ mysql --protocol=tcp -u root mytest -e "
INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]');
"


statement ok
create source mysql_source with (
connector = 'mysql-cdc',
Expand All @@ -46,6 +51,19 @@ create source mysql_source with (
server.id = '5601'
);

statement ok
create table rw_customers (*) from mysql_source table 'mytest.customers';

# Name, Type, Is Hidden, Description
query TTTT
describe rw_customers;
----
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL

statement ok
create table rw_mysql_types_test (*) from mysql_source table 'mytest.mysql_types_test';
Expand Down
4 changes: 2 additions & 2 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1294,14 +1294,14 @@ def section_streaming_actors(outer_panels):
"The number of matched rows on the opposite side",
[
*quantile(
lambda quantile, legend: panels.target(
lambda quantile, legend: panels.target_hidden(
f"histogram_quantile({quantile}, sum(rate({metric('stream_join_matched_join_keys_bucket')}[$__rate_interval])) by (le, fragment_id, table_id, {COMPONENT_LABEL}))",
f"p{legend} - fragment {{{{fragment_id}}}} table_id {{{{table_id}}}} - {{{{{COMPONENT_LABEL}}}}}",
),
[90, 99, "max"],
),
panels.target(
f"sum by(le, job, actor_id, table_id) (rate({metric('stream_join_matched_join_keys_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}, fragment_id, table_id) (rate({table_metric('stream_join_matched_join_keys_count')}[$__rate_interval])) >= 0",
f"sum by(le, {COMPONENT_LABEL}, fragment_id, table_id) (rate({metric('stream_join_matched_join_keys_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}, fragment_id, table_id) (rate({table_metric('stream_join_matched_join_keys_count')}[$__rate_interval])) >= 0",
"avg - fragment {{fragment_id}} table_id {{table_id}} - {{%s}}"
% COMPONENT_LABEL,
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,15 @@ var record = event.value();
byte[] key =
keyConverter.fromConnectData(
record.topic(), record.keySchema(), record.key());
String msgPayload =
payload == null ? "" : new String(payload, StandardCharsets.UTF_8);
// key can be null if the table has no primary key
String msgKey = key == null ? "" : new String(key, StandardCharsets.UTF_8);
var message =
msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setKey(new String(key, StandardCharsets.UTF_8))
.setPayload(msgPayload)
.setKey(msgKey)
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug(
Expand Down
8 changes: 4 additions & 4 deletions lints/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "ui/format_error.rs"
# See `README.md` before bumping the version.
# Remember to update the version in `ci/Dockerfile` as well.
[dependencies]
clippy_utils = { git = "https://github.com/rust-lang/rust-clippy", rev = "9d6f41691ed9dbfaec2a2df2661c42451f2fe0d3" }
clippy_utils = { git = "https://github.com/rust-lang/rust-clippy", rev = "fca4e16ffb8c07186ee23becd44cd5c9fb51896c" }
dylint_linting = "3.1.0"
itertools = "0.12"

Expand Down
2 changes: 1 addition & 1 deletion lints/rust-toolchain
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# See `README.md` before bumping the version.

[toolchain]
channel = "nightly-2024-03-21"
channel = "nightly-2024-04-18"
components = ["llvm-tools-preview", "rustc-dev"]
22 changes: 11 additions & 11 deletions lints/src/format_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ fn check_fmt_arg_in_anyhow_context(cx: &LateContext<'_>, arg_expr: &Expr<'_>) {
);
}

fn check_fmt_arg_with_help(cx: &LateContext<'_>, arg_expr: &Expr<'_>, help: impl Help) {
fn check_fmt_arg_with_help(cx: &LateContext<'_>, arg_expr: &Expr<'_>, help: impl Help + 'static) {
check_arg(cx, arg_expr, arg_expr.span, help);
}

Expand All @@ -181,7 +181,7 @@ fn check_to_string_call(cx: &LateContext<'_>, receiver: &Expr<'_>, to_string_spa
);
}

fn check_arg(cx: &LateContext<'_>, arg_expr: &Expr<'_>, span: Span, help: impl Help) {
fn check_arg(cx: &LateContext<'_>, arg_expr: &Expr<'_>, span: Span, help: impl Help + 'static) {
let Some(error_trait_id) = cx.tcx.get_diagnostic_item(sym::Error) else {
return;
};
Expand Down Expand Up @@ -218,31 +218,31 @@ fn check_arg(cx: &LateContext<'_>, arg_expr: &Expr<'_>, span: Span, help: impl H
}

trait Help {
fn normal_help(&self) -> &str;
fn anyhow_help(&self) -> &str {
fn normal_help(&self) -> &'static str;
fn anyhow_help(&self) -> &'static str {
self.normal_help()
}
fn report_help(&self) -> Option<&str> {
fn report_help(&self) -> Option<&'static str> {
None
}
}

impl Help for &str {
fn normal_help(&self) -> &str {
impl Help for &'static str {
fn normal_help(&self) -> &'static str {
self
}
}

impl Help for (&str, &str, &str) {
fn normal_help(&self) -> &str {
impl Help for (&'static str, &'static str, &'static str) {
fn normal_help(&self) -> &'static str {
self.0
}

fn anyhow_help(&self) -> &str {
fn anyhow_help(&self) -> &'static str {
self.1
}

fn report_help(&self) -> Option<&str> {
fn report_help(&self) -> Option<&'static str> {
Some(self.2)
}
}
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ normal = ["workspace-hack"]
[dependencies]
anyhow = "1"
arrow-array = { workspace = true }
arrow-array-iceberg = { workspace = true }
arrow-schema = { workspace = true }
assert_matches = "1"
async-recursion = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/batch/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn create_order_by_executor(
"SortExecutor".into(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
))
}
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::executor::{
};
use crate::monitor::BatchSpillMetrics;
use crate::spill::spill_op::{
SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken, TaskId};

Expand Down Expand Up @@ -325,7 +325,7 @@ impl AggSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
let op = SpillOp::create(dir)?;
let op = SpillOp::create(dir, SpillBackend::Disk)?;
let agg_state_writers = Vec::with_capacity(partition_num);
let agg_state_chunk_builder = Vec::with_capacity(partition_num);
let input_writers = Vec::with_capacity(partition_num);
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_array_iceberg::RecordBatch;
use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use icelake::io::{FileScan, TableScan};
use risingwave_common::array::arrow::{FromArrow, IcebergArrowConvert};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::Schema;
use risingwave_connector::sink::iceberg::IcebergConfig;

Expand Down Expand Up @@ -150,7 +150,7 @@ impl IcebergScanExecutor {
}

fn record_batch_to_chunk(record_batch: RecordBatch) -> Result<DataChunk, BatchError> {
Ok(IcebergArrowConvert.from_record_batch(&record_batch)?)
Ok(IcebergArrowConvert.chunk_from_record_batch(&record_batch)?)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::executor::{
use crate::monitor::BatchSpillMetrics;
use crate::risingwave_common::hash::NullBitmap;
use crate::spill::spill_op::{
SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken};

Expand Down Expand Up @@ -273,7 +273,7 @@ impl JoinSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", join_identity, suffix_uuid);
let op = SpillOp::create(dir)?;
let op = SpillOp::create(dir, SpillBackend::Disk)?;
let probe_side_writers = Vec::with_capacity(partition_num);
let build_side_writers = Vec::with_capacity(partition_num);
let probe_side_chunk_builders = Vec::with_capacity(partition_num);
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ mod tests {
"SortExecutor".into(),
CHUNK_SIZE,
MemoryContext::none(),
false,
None,
BatchSpillMetrics::for_test(),
))
}
Expand Down
Loading

0 comments on commit 2b5d95a

Please sign in to comment.