Skip to content

Commit

Permalink
Merge branch 'main' into wangzheng/fix_time_travel
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Aug 19, 2024
2 parents e861ac0 + 7b97788 commit dabcb44
Show file tree
Hide file tree
Showing 35 changed files with 984 additions and 265 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/auto-create-doc-issue-by-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- name: Create issue in other repository
if: steps.check_merged.outputs.merged == 'true' && steps.check_documentation_update.outputs.documentation_update == 'true'
run: |
ISSUE_CONTENT="This issue tracks the documentation update needed for the merged PR #$PR_ID.\n\nSource PR URL: $PR_URL\nSource PR Merged At: $PR_MERGED_AT"
ISSUE_CONTENT="This issue tracks the documentation update needed for the merged PR #$PR_ID.\n\nSource PR URL: $PR_URL\nSource PR Merged At: $PR_MERGED_AT\n\nIf it is a major improvement that deserves a new page or a new section in the documentation, please check if we should label it as an experiment feature."
curl -X POST \
-H "Authorization: Bearer ${{ secrets.ACCESS_TOKEN }}" \
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ echo "> inserted new rows into postgres"

# start cluster w/o clean-data
unset RISINGWAVE_CI
export RUST_LOG="events::stream::message::chunk=trace,risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info" \
export RUST_LOG="risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info" \

risedev dev ci-1cn-1fe-with-recovery
echo "> wait for cluster recovery finish"
Expand Down
3 changes: 0 additions & 3 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ message AddWorkerNodeRequest {
}

message AddWorkerNodeResponse {
reserved 3;
reserved "system_params";
common.Status status = 1;
optional uint32 node_id = 2;
string cluster_id = 4;
}
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/array/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ mod tests {

use super::*;
use crate::array::{ArrayBuilder, ArrayImpl};
use crate::for_all_array_variants;
use crate::for_all_variants;

macro_rules! test_trusted_len {
($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
$(
paste! {
#[test]
Expand Down Expand Up @@ -91,5 +91,5 @@ mod tests {
};
}

for_all_array_variants! { test_trusted_len }
for_all_variants! { test_trusted_len }
}
14 changes: 7 additions & 7 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub use self::error::ArrayError;
pub use crate::array::num256_array::{Int256Array, Int256ArrayBuilder};
use crate::bitmap::Bitmap;
use crate::types::*;
use crate::{dispatch_array_builder_variants, dispatch_array_variants, for_all_array_variants};
use crate::{dispatch_array_builder_variants, dispatch_array_variants, for_all_variants};
pub type ArrayResult<T> = Result<T, ArrayError>;

pub type I64Array = PrimitiveArray<i64>;
Expand Down Expand Up @@ -325,7 +325,7 @@ impl<A: Array> CompactableArray for A {

/// Define `ArrayImpl` with macro.
macro_rules! array_impl_enum {
( $( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
( $( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
/// `ArrayImpl` embeds all possible array in `array` module.
#[derive(Debug, Clone, EstimateSize)]
pub enum ArrayImpl {
Expand All @@ -334,7 +334,7 @@ macro_rules! array_impl_enum {
};
}

for_all_array_variants! { array_impl_enum }
for_all_variants! { array_impl_enum }

// We cannot put the From implementations in impl_convert,
// because then we can't prove for all `T: PrimitiveArrayItemType`,
Expand Down Expand Up @@ -401,7 +401,7 @@ impl From<MapArray> for ArrayImpl {
/// * `ArrayImpl -> Array` with `From` trait.
/// * `ArrayBuilder -> ArrayBuilderImpl` with `From` trait.
macro_rules! impl_convert {
($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
$(
paste! {
impl ArrayImpl {
Expand Down Expand Up @@ -455,11 +455,11 @@ macro_rules! impl_convert {
};
}

for_all_array_variants! { impl_convert }
for_all_variants! { impl_convert }

/// Define `ArrayImplBuilder` with macro.
macro_rules! array_builder_impl_enum {
($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
/// `ArrayBuilderImpl` embeds all possible array in `array` module.
#[derive(Debug, Clone, EstimateSize)]
pub enum ArrayBuilderImpl {
Expand All @@ -468,7 +468,7 @@ macro_rules! array_builder_impl_enum {
};
}

for_all_array_variants! { array_builder_impl_enum }
for_all_variants! { array_builder_impl_enum }

/// Implements all `ArrayBuilder` functions with `for_all_variant`.
impl ArrayBuilderImpl {
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/test_utils/rand_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::for_all_array_variants;
use crate::for_all_variants;

#[test]
fn test_create_array() {
macro_rules! gen_rand_array {
($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
$(
{
let array = seed_rand_array::<$array>(10, 1024, 0.5);
Expand All @@ -216,6 +216,6 @@ mod tests {
};
}

for_all_array_variants! { gen_rand_array }
for_all_variants! { gen_rand_array }
}
}
57 changes: 0 additions & 57 deletions src/common/src/types/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,63 +64,6 @@ macro_rules! for_all_variants {
};
}

/// The projected version of `for_all_variants` for handling scalar variants.
///
/// Arguments are `$variant_name`, `$suffix_name`, `$scalar`, `$scalar_ref`.
#[macro_export(local_inner_macros)]
macro_rules! for_all_scalar_variants {
($macro:ident $(, $x:tt)*) => {
for_all_variants! { project_scalar_variants, $macro, [ $($x, )* ] }
};
}
#[macro_export]
macro_rules! project_scalar_variants {
($macro:ident, [ $($x:tt, )* ], $( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
$macro! {
$($x, )*
$( { $variant_name, $suffix_name, $scalar, $scalar_ref } ),*
}
};
}

/// The projected version of `for_all_variants` for handling array variants.
///
/// Arguments are `$variant_name`, `$suffix_name`, `$array`, `$builder`.
#[macro_export(local_inner_macros)]
macro_rules! for_all_array_variants {
($macro:ident $(, $x:tt)*) => {
for_all_variants! { project_array_variants, $macro, [ $($x, )* ] }
};
}
#[macro_export]
macro_rules! project_array_variants {
($macro:ident, [ $($x:tt, )* ], $( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
$macro! {
$($x, )*
$( { $variant_name, $suffix_name, $array, $builder } ),*
}
};
}

/// The projected version of `for_all_variants` for handling mapping of data types and array types.
///
/// Arguments are `$data_type`, `$variant_name`.
#[macro_export(local_inner_macros)]
macro_rules! for_all_type_pairs {
($macro:ident $(, $x:tt)*) => {
for_all_variants! { project_type_pairs, $macro, [ $($x, )* ] }
};
}
#[macro_export]
macro_rules! project_type_pairs {
($macro:ident, [ $($x:tt, )* ], $( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
$macro! {
$($x, )*
$( { $data_type, $variant_name } ),*
}
};
}

/// Helper macro for expanding type aliases and constants. Internally used by `dispatch_` macros.
#[macro_export]
macro_rules! do_expand_alias {
Expand Down
19 changes: 9 additions & 10 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ pub use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructVa
use crate::cast::{str_to_bool, str_to_bytea};
use crate::error::BoxedError;
use crate::{
dispatch_data_types, dispatch_scalar_ref_variants, dispatch_scalar_variants,
for_all_scalar_variants, for_all_type_pairs,
dispatch_data_types, dispatch_scalar_ref_variants, dispatch_scalar_variants, for_all_variants,
};

mod cow;
Expand Down Expand Up @@ -552,7 +551,7 @@ pub trait ScalarRef<'a>: private::ScalarBounds<ScalarRefImpl<'a>> + 'a + Copy {

/// Define `ScalarImpl` and `ScalarRefImpl` with macro.
macro_rules! scalar_impl_enum {
($( { $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty } ),*) => {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
/// `ScalarImpl` embeds all possible scalars in the evaluation framework.
///
/// Note: `ScalarImpl` doesn't contain all information of its `DataType`,
Expand Down Expand Up @@ -580,7 +579,7 @@ macro_rules! scalar_impl_enum {
};
}

for_all_scalar_variants! { scalar_impl_enum }
for_all_variants! { scalar_impl_enum }

// We MUST NOT implement `Ord` for `ScalarImpl` because that will make `Datum` derive an incorrect
// default `Ord`. To get a default-ordered `ScalarImpl`/`ScalarRefImpl`/`Datum`/`DatumRef`, you can
Expand Down Expand Up @@ -686,7 +685,7 @@ macro_rules! for_all_native_types {
/// * `&ScalarImpl -> &Scalar` with `impl.as_int16()`.
/// * `ScalarImpl -> Scalar` with `impl.into_int16()`.
macro_rules! impl_convert {
($( { $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty } ),*) => {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
$(
impl From<$scalar> for ScalarImpl {
fn from(val: $scalar) -> Self {
Expand Down Expand Up @@ -758,7 +757,7 @@ macro_rules! impl_convert {
};
}

for_all_scalar_variants! { impl_convert }
for_all_variants! { impl_convert }

// Implement `From<raw float>` for `ScalarImpl::Float` as a sugar.
impl From<f32> for ScalarImpl {
Expand Down Expand Up @@ -1092,16 +1091,16 @@ pub fn literal_type_match(data_type: &DataType, literal: Option<&ScalarImpl>) ->
match literal {
Some(scalar) => {
macro_rules! matches {
($( { $DataType:ident, $PhysicalType:ident }),*) => {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty }),*) => {
match (data_type, scalar) {
$(
(DataType::$DataType { .. }, ScalarImpl::$PhysicalType(_)) => true,
(DataType::$DataType { .. }, _) => false, // so that we won't forget to match a new logical type
(DataType::$data_type { .. }, ScalarImpl::$variant_name(_)) => true,
(DataType::$data_type { .. }, _) => false, // so that we won't forget to match a new logical type
)*
}
}
}
for_all_type_pairs! { matches }
for_all_variants! { matches }
}
None => true,
}
Expand Down
8 changes: 4 additions & 4 deletions src/common/src/util/schema_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use itertools::Itertools;

use crate::array::{ArrayImpl, ArrayRef};
use crate::for_all_type_pairs;
use crate::for_all_variants;
use crate::types::DataType;

/// Check if the schema of `columns` matches the expected `data_types`. Used for debugging.
Expand All @@ -30,9 +30,9 @@ where
.enumerate()
{
macro_rules! matches {
($( { $DataType:ident, $PhysicalType:ident }),*) => {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty }),*) => {
match (pair.as_ref().left(), pair.as_ref().right()) {
$( (Some(DataType::$DataType { .. }), Some(ArrayImpl::$PhysicalType(_))) => continue, )*
$( (Some(DataType::$data_type { .. }), Some(ArrayImpl::$variant_name(_))) => continue, )*
(data_type, array) => {
let array_ident = array.map(|a| a.get_ident());
return Err(format!(
Expand All @@ -43,7 +43,7 @@ where
}
}

for_all_type_pairs! { matches }
for_all_variants! { matches }
}

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ pub async fn compute_node_serve(
},
&config.meta,
)
.await
.unwrap();
.await;

let state_store_url = system_params.state_store();

Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/common/meta_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Note: the default value of `RW_META_ADDR` is 'http://127.0.0.1:5690'.";
Property::default(),
&MetaConfig::default(),
)
.await?;
.await;
let worker_id = client.worker_id();
tracing::info!("registered as RiseCtl worker, worker_id = {}", worker_id);
Ok(client)
Expand Down
6 changes: 3 additions & 3 deletions src/expr/core/src/expr/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use either::Either;
use risingwave_common::array::*;
use risingwave_common::for_all_array_variants;
use risingwave_common::for_all_variants;
use risingwave_common::types::{Datum, DatumRef, Scalar, ToDatumRef};

/// The type-erased return value of an expression.
Expand Down Expand Up @@ -79,7 +79,7 @@ impl<'a, A: Array> ValueRef<'a, A> {
}

macro_rules! impl_convert {
($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
$(
paste::paste! {
/// Converts a type-erased value to a reference of a specific array type.
Expand All @@ -102,4 +102,4 @@ macro_rules! impl_convert {
};
}

for_all_array_variants! { impl_convert }
for_all_variants! { impl_convert }
14 changes: 14 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,20 @@
sql: |
CREATE TABLE t (v1 int, v2 int);
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) as x, count(*), max(v2) as m2, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v2) as y from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test approx_percentile hash_agg forced should use single phase agg
sql: |
SET RW_FORCE_TWO_PHASE_AGG=true;
create table t (v1 int, grp_col int);
select approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t group by grp_col;
expected_outputs:
- stream_error
- name: test approx percentile with default relative_error
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5) WITHIN GROUP (order by v1) from t;
expected_outputs:
- logical_plan
- stream_plan
24 changes: 24 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2103,3 +2103,27 @@
└─StreamHashAgg { group_key: [$expr5], aggs: [sum(t.v1), count, max(t.v2)] }
└─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr3, t.v2, t.v2::Float64 as $expr4, t._row_id, Vnode(t._row_id) as $expr5] }
└─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: test approx_percentile hash_agg forced should use single phase agg
sql: |
SET RW_FORCE_TWO_PHASE_AGG=true;
create table t (v1 int, grp_col int);
select approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t group by grp_col;
stream_error: |-
Feature is not yet implemented: two-phase streaming approx percentile aggregation with group key, please use single phase aggregation instead
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
- name: test approx percentile with default relative_error
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5) WITHIN GROUP (order by v1) from t;
logical_plan: |-
LogicalProject { exprs: [approx_percentile($expr1)] }
└─LogicalAgg { aggs: [approx_percentile($expr1)] }
└─LogicalProject { exprs: [t.v1::Float64 as $expr1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }
└─StreamExchange { dist: Single }
└─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }
└─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
Loading

0 comments on commit dabcb44

Please sign in to comment.