Skip to content

Commit

Permalink
refactor(expr): separate function implementations into a new crate (#…
Browse files Browse the repository at this point in the history
…12485)

Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Sep 26, 2023
1 parent 09a1dcb commit d583594
Show file tree
Hide file tree
Showing 208 changed files with 1,960 additions and 1,907 deletions.
50 changes: 38 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ members = [
"src/compute",
"src/connector",
"src/ctl",
"src/expr",
"src/expr/core",
"src/expr/impl",
"src/expr/macro",
"src/frontend",
"src/frontend/planner_test",
Expand Down Expand Up @@ -125,7 +126,8 @@ risingwave_compactor = { path = "./src/storage/compactor" }
risingwave_compute = { path = "./src/compute" }
risingwave_ctl = { path = "./src/ctl" }
risingwave_connector = { path = "./src/connector" }
risingwave_expr = { path = "./src/expr" }
risingwave_expr = { path = "./src/expr/core" }
risingwave_expr_impl = { path = "./src/expr/impl" }
risingwave_frontend = { path = "./src/frontend" }
risingwave_hummock_sdk = { path = "./src/storage/hummock_sdk" }
risingwave_hummock_test = { path = "./src/storage/hummock_test" }
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio", "async"] }
rand = "0.8"
risingwave_expr_impl = { workspace = true }
tempfile = "3"

[target.'cfg(unix)'.dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/batch/benches/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::DataType;
use risingwave_common::{enable_jemalloc_on_unix, hash};
use risingwave_expr::agg::{AggCall, AggKind};
use risingwave_expr::aggregate::{AggCall, AggKind};
use risingwave_pb::expr::{PbAggCall, PbInputRef};
use tokio::runtime::Runtime;
use utils::{create_input, execute_executor};
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/aggregation/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::agg::{
use risingwave_expr::aggregate::{
AggStateDyn, AggregateFunction, AggregateState, BoxedAggregateFunction,
};
use risingwave_expr::Result;
Expand Down Expand Up @@ -112,7 +112,7 @@ mod tests {
use risingwave_common::array::StreamChunk;
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::{Datum, Decimal};
use risingwave_expr::agg::AggCall;
use risingwave_expr::aggregate::AggCall;

use super::super::build;

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/aggregation/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;

use risingwave_common::array::StreamChunk;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::agg::{AggregateFunction, AggregateState, BoxedAggregateFunction};
use risingwave_expr::aggregate::{AggregateFunction, AggregateState, BoxedAggregateFunction};
use risingwave_expr::expr::Expression;
use risingwave_expr::Result;

Expand Down Expand Up @@ -74,7 +74,7 @@ impl AggregateFunction for Filter {
#[cfg(test)]
mod tests {
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_expr::agg::{build_append_only, AggCall};
use risingwave_expr::aggregate::{build_append_only, AggCall};
use risingwave_expr::expr::{build_from_pretty, Expression, LiteralExpression};

use super::*;
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod filter;
mod orderby;
mod projection;

use risingwave_expr::agg::{self, AggCall, BoxedAggregateFunction};
use risingwave_expr::aggregate::{build_append_only, AggCall, BoxedAggregateFunction};
use risingwave_expr::Result;

use self::distinct::Distinct;
Expand All @@ -30,7 +30,7 @@ use self::projection::Projection;

/// Build an `BoxedAggregateFunction` from `AggCall`.
pub fn build(agg: &AggCall) -> Result<BoxedAggregateFunction> {
let mut aggregator = agg::build_append_only(agg)?;
let mut aggregator = build_append_only(agg)?;

if agg.distinct {
aggregator = Box::new(Distinct::new(aggregator));
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/aggregation/orderby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::memcmp_encoding;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::agg::{
use risingwave_expr::aggregate::{
AggStateDyn, AggregateFunction, AggregateState, BoxedAggregateFunction,
};
use risingwave_expr::{ExprError, Result};
Expand Down Expand Up @@ -151,7 +151,7 @@ impl AggregateFunction for ProjectionOrderBy {
mod tests {
use risingwave_common::array::{ListValue, StreamChunk};
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_expr::agg::AggCall;
use risingwave_expr::aggregate::AggCall;

use super::super::build;

Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/aggregation/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::ops::Range;

use risingwave_common::array::StreamChunk;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::agg::{AggregateFunction, AggregateState, BoxedAggregateFunction};
use risingwave_expr::aggregate::{AggregateFunction, AggregateState, BoxedAggregateFunction};
use risingwave_expr::Result;

pub struct Projection {
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::agg::{AggCall, AggregateState, BoxedAggregateFunction};
use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HashAggNode;

Expand Down
14 changes: 7 additions & 7 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ mod tests {
6 2 ^10:42:00
7 1 ^10:51:00
8 3 ^11:02:00"
.replace('^', "2022-2-2T"),
.replace('^', "2022-02-02T"),
);
let mut mock_executor = MockExecutor::new(schema.clone());
mock_executor.add(chunk);
Expand Down Expand Up @@ -326,7 +326,7 @@ mod tests {
6 2 ^10:42:00 ^10:14:00 ^10:44:00
7 1 ^10:51:00 ^10:29:00 ^10:59:00
8 3 ^11:02:00 ^10:44:00 ^11:14:00"
.replace('^', "2022-2-2T"),
.replace('^', "2022-02-02T"),
)
);
assert_eq!(
Expand All @@ -341,7 +341,7 @@ mod tests {
6 2 ^10:42:00 ^10:14:00 ^10:44:00
7 1 ^10:51:00 ^10:29:00 ^10:59:00
8 3 ^11:02:00 ^10:44:00 ^11:14:00"
.replace('^', "2022-2-2T"),
.replace('^', "2022-02-02T"),
)
);
}
Expand Down Expand Up @@ -371,7 +371,7 @@ mod tests {
6 2 ^10:42:00 ^10:15:00 ^10:45:00
7 1 ^10:51:00 ^10:30:00 ^11:00:00
8 3 ^11:02:00 ^10:45:00 ^11:15:00"
.replace('^', "2022-2-2T"),
.replace('^', "2022-02-02T"),
)
);

Expand All @@ -388,7 +388,7 @@ mod tests {
6 2 ^10:42:00 ^10:30:00 ^11:00:00
7 1 ^10:51:00 ^10:45:00 ^11:15:00
8 3 ^11:02:00 ^11:00:00 ^11:30:00"
.replace('^', "2022-2-2T"),
.replace('^', "2022-02-02T"),
)
);
}
Expand All @@ -415,7 +415,7 @@ mod tests {
2 ^10:15:00 ^10:45:00 ^10:42:00
1 ^10:30:00 ^11:00:00 ^10:51:00
3 ^10:45:00 ^11:15:00 ^11:02:00"
.replace('^', "2022-2-2T"),
.replace('^', "2022-02-02T"),
)
);

Expand All @@ -432,7 +432,7 @@ mod tests {
2 ^10:30:00 ^11:00:00 ^10:42:00
1 ^10:45:00 ^11:15:00 ^10:51:00
3 ^11:00:00 ^11:30:00 ^11:02:00"
.replace('^', "2022-2-2T"),
.replace('^', "2022-02-02T"),
)
);
}
Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ mod tests {
2 5.5 2 5.5
2 8.4 2 5.5",
);
let condition = build_from_pretty("(less_than:boolean (cast:float4 5:int4) $3:float4)");
let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");

do_test(JoinType::Inner, Some(condition), false, expected).await;
}
Expand All @@ -709,7 +709,7 @@ mod tests {
5 9.1 . .
. . . .",
);
let condition = build_from_pretty("(less_than:boolean (cast:float4 5:int4) $3:float4)");
let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");

do_test(JoinType::LeftOuter, Some(condition), false, expected).await;
}
Expand All @@ -722,7 +722,7 @@ mod tests {
2 5.5
2 8.4",
);
let condition = build_from_pretty("(less_than:boolean (cast:float4 5:int4) $3:float4)");
let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");

do_test(JoinType::LeftSemi, Some(condition), false, expected).await;
}
Expand All @@ -736,7 +736,7 @@ mod tests {
5 9.1
. .",
);
let condition = build_from_pretty("(less_than:boolean (cast:float4 5:int4) $3:float4)");
let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");

do_test(JoinType::LeftAnti, Some(condition), false, expected).await;
}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/sort_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::array::{Array, ArrayBuilderImpl, ArrayImpl, DataChunk, St
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::agg::{AggCall, AggregateState, BoxedAggregateFunction};
use risingwave_expr::aggregate::{AggCall, AggregateState, BoxedAggregateFunction};
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::plan_node::NodeBody;

Expand Down
3 changes: 3 additions & 0 deletions src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ pub mod task;
extern crate tracing;
#[macro_use]
extern crate risingwave_common;

#[cfg(test)]
risingwave_expr_impl::enable!();
1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ risingwave_common = { workspace = true }
risingwave_compactor = { workspace = true }
risingwave_compute = { workspace = true }
risingwave_ctl = { workspace = true }
risingwave_expr_impl = { workspace = true }
risingwave_frontend = { workspace = true }
risingwave_meta = { workspace = true }
risingwave_rt = { workspace = true }
Expand Down
Loading

0 comments on commit d583594

Please sign in to comment.