-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support expression to compute iceberg sink partition value #14470
Conversation
Cargo.lock
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, why Cargo.lock has so many updates? Especially why syn
, serde
is updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's because avro
specified the minor number (and uses dependabot) and thus forces downstream to update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker. But want to take a look at the CHANGELOG of syn
, serde
, thiserror
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add mod comments to describe what does "external" mean?
/// This module contains the iceberg expression for computing the partition value. | ||
/// spec ref: <https://iceberg.apache.org/spec/#partition-transforms> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use //!
and put at the beginning of the file
proto/expr.proto
Outdated
|
||
// EXTERNAL | ||
ICEBERG_BUCKET = 2201; | ||
ICEBERG_TRUNCATE = 2202; | ||
ICEBERG_YEAR = 2203; | ||
ICEBERG_MONTH = 2204; | ||
ICEBERG_DAY = 2205; | ||
ICEBERG_HOUR = 2206; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little surprising that this many expressions are needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review note: big picture is here IcebergPartitionInfo::convert_to_expression
https://github.com/risingwavelabs/risingwave/pull/14135/files#diff-fca15fe3e7a30b34c86607302831aab3e5354163a89ccedc5d36a167d60f4b91R89-R191
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain a bit why a single function, maybe like iceberg_partition_value
isn't enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, is it because the "input" isn't exprs, but iceberg "transform"s, so it's not easy to construct? Hmmm 🧐
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, is it because the "input" isn't exprs, but iceberg "transform"s, so it's not easy to construct? Hmmm 🧐
Yes. Another reason is that different transform function has different signatures.
e.g.
bucket(input,n) -> i32
, input should be int, long, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, binary
.
truncate(input,w) -> input
, input should be int, long, decimal, string
.
Actually, we also can use a function like 🤔
iceberg_transform(type, metadata, input) -> any
But then we need to add lots of check logic in a single function like following and this will make this function too complex I think.
fn iceberg_transform<T>(type:i32, metadata: T, input:..) {
switch type {
bucket => ,
truncate => ,
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I find a way to avoid this, let me have a try.🥵
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new implementation have avoid it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some unit tests and e2e tests for these functions?
6d55b5c
to
c6455cc
Compare
I have added the unit test. And these expressions don't expect to be called by the user directly so we didn't support them in frontend. (But maybe it's an interesting feature.😜) The e2e test will be postponed to the next PR in sink test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
} | ||
|
||
async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> { | ||
unimplemented!() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will panic, I would suggest to return feature not supported error.
use risingwave_expr::{build_function, Result}; | ||
|
||
#[derive(Debug)] | ||
pub enum IcebergTransformType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to reuse the Transform
in icelake
so that we don't need to maintain two sets of enums?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess defining a new enum is because we need a fixed mapping between transform types and integers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess defining a new enum is because we need a fixed mapping between transform types and integers?
This way maybe a function would be enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl<'a> ToString for &'a Transform {
fn to_string(&self) -> String {
match self {
Transform::Identity => "identity".to_string(),
Transform::Year => "year".to_string(),
Transform::Month => "month".to_string(),
Transform::Day => "day".to_string(),
Transform::Hour => "hour".to_string(),
Transform::Void => "void".to_string(),
Transform::Bucket(length) => format!("bucket[{}]", length),
Transform::Truncate(width) => format!("truncate[{}]", width),
}
}
}
I find that we can use string so that we don't even need extra function meta param( all info contains in the string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's Just serializaion/deserializatiom
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
c6455cc
to
5647507
Compare
Cargo.lock
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
let datum = children[0].eval_const()?.unwrap(); | ||
let str = datum.as_utf8(); | ||
Transform::from_str(str).map_err(|_| ExprError::InvalidParam { | ||
name: "transform type in icberg_transform", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name: "transform type in icberg_transform", | |
name: "Invalid transform type in icberg_transform", |
For now, the signature is oh, I guess we can just use new_unchecked to bypass type infer. But maybe we can have a way to custom |
Transform::Year | Transform::Month | Transform::Day | Transform::Hour => { | ||
IcebergTransform::check_time(&return_type, &children)? | ||
} | ||
Transform::Identity | Transform::Void => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why identity and void is illegal? Identity means user has calculated partition value for us, so I think it should be valid. Void
is a little complex, if all partition fields are void, it should be treated as unpartitioned. But if only one field is partitioned, we should return null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For identity and void, we can directly represent them in the project as InputRef
and const null. It can avoid extra expression compute and data convert, but I'm not sure whether it's a good practice...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean do this in optimizer? Sounds reasonable to me, but please add some comment here.
} | ||
|
||
impl IcebergTransform { | ||
fn check_bucket(return_type: &DataType, children: &Vec<BoxedExpression>) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to use the return_type
api in Transform
to do the verification so that we can keep consistency
Yes, the auto-generated #[build_function("...", type_infer = "...")] But at present we don't call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally LGTM
5647507
to
3d218ce
Compare
To avoid use type_infer, I mark it as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
// Get the child array | ||
let array = self.child.eval(data_chunk).await?; | ||
// Convert to arrow array | ||
let arrow_array = array.as_ref().try_into().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why unwrap
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will convert it as an error in the next PR.
cc @wangrunji0408 @xxchan Any other comments? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
) Co-authored-by: ZENOTME <[email protected]> Co-authored-by: Renjie Liu <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This PR is the first part of #13898. For the whole picture, can refer the draft PR: #14135.
This PR introduces two expression to compute partition value:
For now, the transform function in icelake only support to take a arrow array as input. In the future, we will refatocr it as a function which take a value as input so that we can implement these external function using the expression macro in expr/impl. Like:
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.