Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): enhance parquet file source #19221

Merged
merged 13 commits into from
Nov 12, 2024
Merged

Conversation

wcy-fdu
Copy link
Contributor

@wcy-fdu wcy-fdu commented Oct 31, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This PR primarily focuses on the conversion of the Timestamp type. The Arrow Timestamp includes four types:


pub enum TimeUnit {
    /// Time in seconds.
    Second,
    /// Time in milliseconds.
    Millisecond,
    /// Time in microseconds.
    Microsecond,
    /// Time in nanoseconds.
    Nanosecond,
}

but previously we only handled one Microsecond. This pr makes the comparison of data types more reasonable:

  • when doing data type check at column pruning, make risingwave_common::types::DataType::Timestamp match arrow_schema_iceberg::DataType::Timestamp(_, _).
  • when convert arrow field to rw field, add Nanosecond, Millisecond, Second type.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@graphite-app graphite-app bot requested review from a team October 31, 2024 14:23
@wcy-fdu wcy-fdu requested review from hzxa21 and chenzl25 November 1, 2024 06:20
@chenzl25 chenzl25 requested a review from xxchan November 1, 2024 08:07
Comment on lines 712 to 737
fn from_timestampsecond_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}
fn from_timestampsecond_some_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampms_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampms_some_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious, we don't need to do any conversion between these types? All of them can be implemented in the same way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just added such logic and verify it works. I constructed a parquet file containing timestamps of four time units, then imported it into RisingWave and it was parsed successfully:
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't get why we need to add different methods to convert timestamp in different units. IIUC, the current conversion logic won't lose any precision. What happened if we just add different units of the timestamp type to converts macro and rely on the original implementation to do the conversion?

converts!(TimestampArray, arrow_array::TimestampMillisecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampSecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampNanosecondArray, @map);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our previous code, we only handled timestamps in microseconds, so this conversion is only correct for timestamp(MicroSecond, _). As you mentioned in the logic, the seconds part is % 1_000_000, and the nanoseconds part is also % 1_000_000. If all four time units are converted using this logic, it will yield incorrect results, as follows:
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct approach should be as shown in the PR, where different handling is done based on the time unit. For example, for milliseconds, it should be %1000 instead of %100000:

 TimeUnit::Millisecond => Timestamp(
                DateTime::from_timestamp((value / 1_000) as _, (value % 1_000 * 1000000) as _)
                    .unwrap()
                    .naive_utc(),
            ),

So I think specifying the time unit is essential.

Comment on lines 973 to 983
/// Converts RisingWave value from and into Arrow value.
trait FromIntoArrow {
/// The corresponding element type in the Arrow array.
type ArrowType;
/// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp.
type TimestampType;
fn from_arrow(value: Self::ArrowType) -> Self;
fn into_arrow(self) -> Self::ArrowType;
/// Used for converting timestamp types and will not be used in conversions of other types.
fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking, for type conversion, schema (i.e., type information) should be provided, since data itself may not be self-contained.

fn convert(v_from, t_from, t_to) -> v_to 

t_from & t_to are not always needed though

e.g., our Avro conversion

pub fn convert_to_datum<'b>(
&self,
value: &'b Value,
type_expected: &DataType,
) -> AccessResult<DatumCow<'b>>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, _with_unit looks a little strange to me. Haven't checked the details of Arrow yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @xiangjinwu @BugenZhao might also be able to comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.rs/arrow-schema/53.2.0/arrow_schema/enum.DataType.html
If we need _with_unit to pass extra info from DataType, how about the following?

  • IntervalUnit
  • i32 length for FixedSizeBinary
  • precision and scale for Decimal{128,256}

To repeat, DataType should be provided as data itself may not be self-contained.

e2e_test/s3/fs_sink.py Outdated Show resolved Hide resolved
Comment on lines 911 to 944
($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {

impl From<&$ArrayType> for $ArrowType {
fn from(array: &$ArrayType) -> Self {
array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
}
}

impl From<&$ArrowType> for $ArrayType {
fn from(array: &$ArrowType) -> Self {
array.iter().map(|o| {
o.map(|v| {
let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow_with_unit(v, $time_unit);
timestamp
})
}).collect()
}
}

impl From<&[$ArrowType]> for $ArrayType {
fn from(arrays: &[$ArrowType]) -> Self {
arrays
.iter()
.flat_map(|a| a.iter())
.map(|o| {
o.map(|v| {
<<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v)
})
})
.collect()
}
}

};
Copy link
Member

@stdrc stdrc Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very specialized. What about putting it in a new macro, like converts_timestamp?

Comment on lines 977 to 978
/// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp.
type TimestampType;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? I guess it's too intrusive to put timestamp related things in every type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to distinguish the four timestamp types of arrow, instead of just using one i64 to represent them.

Comment on lines 514 to +521
Timestamp(Microsecond, None) => DataType::Timestamp,
Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
Timestamp(Second, None) => DataType::Timestamp,
Timestamp(Second, Some(_)) => DataType::Timestamptz,
Timestamp(Millisecond, None) => DataType::Timestamp,
Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
Timestamp(Nanosecond, None) => DataType::Timestamp,
Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamp(_, None) => DataType::Timestamp,
Timestamp(_, Some(_)) => DataType::Timestamptz,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer to keep time unit designated as a reminder, there are four types of timeunit that need to be handled. Previously, other types were ignored because of the use of _.

Copy link
Collaborator

@hzxa21 hzxa21 Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we use concrete type Microsecond instead of _ so it was ignored not because of the use of _ but the contrast. I don't have a strong opinion on this so listing the types explicitly here is also okay. I am more concerned about having separate logics to handle the different time unit separately given that our original implementation won't lose any precision. See my comment below.

Copy link
Contributor Author

@wcy-fdu wcy-fdu Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't lose any precision

explained in #19221 (comment)

Comment on lines 712 to 737
fn from_timestampsecond_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}
fn from_timestampsecond_some_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampms_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampms_some_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't get why we need to add different methods to convert timestamp in different units. IIUC, the current conversion logic won't lose any precision. What happened if we just add different units of the timestamp type to converts macro and rely on the original implementation to do the conversion?

converts!(TimestampArray, arrow_array::TimestampMillisecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampSecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampNanosecondArray, @map);

@wcy-fdu
Copy link
Contributor Author

wcy-fdu commented Nov 11, 2024

In order not to invade the FromToArrow trait, the timestamp conversion is split into another trait FromIntoArrowWithUnit, and a new macro converts_with_timeunit! is also added, which is much clearer than before. As for why it is necessary to formulate a time unit, see #19221 (comment)

@wcy-fdu wcy-fdu requested review from hzxa21 and chenzl25 November 11, 2024 09:30
src/common/src/array/arrow/arrow_impl.rs Outdated Show resolved Hide resolved
src/common/src/array/arrow/arrow_impl.rs Outdated Show resolved Hide resolved
src/common/src/array/arrow/arrow_impl.rs Outdated Show resolved Hide resolved
src/common/src/array/arrow/arrow_impl.rs Outdated Show resolved Hide resolved
Comment on lines 1094 to 1111
TimeUnit::Second => self
.0
.signed_duration_since(NaiveDateTime::default())
.num_seconds(),
TimeUnit::Millisecond => self
.0
.signed_duration_since(NaiveDateTime::default())
.num_milliseconds(),
TimeUnit::Microsecond => self
.0
.signed_duration_since(NaiveDateTime::default())
.num_microseconds()
.unwrap(),
TimeUnit::Nanosecond => self
.0
.signed_duration_since(NaiveDateTime::default())
.num_nanoseconds()
.unwrap(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use DateTime::timestamp_xxx() here: https://doc.servo.org/chrono/struct.DateTime.html#method.timestamp_micros

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not resolved.

@wcy-fdu wcy-fdu requested a review from hzxa21 November 12, 2024 02:17
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

Comment on lines 1094 to 1111
TimeUnit::Second => self
.0
.signed_duration_since(NaiveDateTime::default())
.num_seconds(),
TimeUnit::Millisecond => self
.0
.signed_duration_since(NaiveDateTime::default())
.num_milliseconds(),
TimeUnit::Microsecond => self
.0
.signed_duration_since(NaiveDateTime::default())
.num_microseconds()
.unwrap(),
TimeUnit::Nanosecond => self
.0
.signed_duration_since(NaiveDateTime::default())
.num_nanoseconds()
.unwrap(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not resolved.

}
Timestamp(Millisecond, Some(_)) => {
self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, None) => {
self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, Some(_)) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR but I wonder why we ignore the timezone (Some(_)) from arrow timestamp when constructing Timestamptz

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced in #17201

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we have encountered three issues regarding the conversion of the timestamp type:

  1. When converting Arrow's timestamp type to our timestamp, we should decide whether to convert it to timestamp or timestampz based on whether there is a timezone (none or some).
  2. When converting Arrow's timestamp type to our timestamp, only microseconds were converted, while the other three time units were not converted.
  3. In type comparisons, Arrow's timestamp(_, none) should match with rw’s timestamp, while timestamp(_, some) should match with rw’s timestamptz. However, previously, apart from microseconds, the other units did not match.

#17201 fix 1, and this pr fix 2 and 3.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. Correct me if I am wrong, I think 1 is not fixed completely because we don't use the correct timezone for timestamptz based on the arrow data type.

Copy link
Contributor Author

@wcy-fdu wcy-fdu Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, indeed this is another bug: during the conversion, the timezone should be handled using Arrow's timezone(), as we are currently using the default timezone. I will fix this in the next PR. Thanks to @hzxa21 for identifying this issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checked, their is no correctness issue:
When the timezone in Arrow is Some(_), the i64 always stores UTC time, which is consistent with PG/RW's timestamptz. Therefore, we only need to distinguish between None and Some(_), as the contents of Some do not affect the actual value; they are merely for display purposes.
refer to https://docs.rs/arrow-schema/53.2.0/arrow_schema/enum.DataType.html#timestamps-with-a-non-empty-timezone

TimeUnit::Second => self.timestamp(),
TimeUnit::Millisecond => self.timestamp_millis(),
TimeUnit::Microsecond => self.timestamp_micros(),
TimeUnit::Nanosecond => self.timestamp_nanos().unwrap_or_default(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use unwrap instead of unwrap_or_default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better not to panic when I encounter a problem with the type conversion? It's better to convert to 0 by mistake than to panic a cluster

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If our TimestampTz is in microsecond unit, I think we should never overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, microsecond checked_mul(1_000) is safe, then unwrap() safely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants