Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): enhance parquet file source #19221

Merged
merged 13 commits into from
Nov 12, 2024
63 changes: 50 additions & 13 deletions e2e_test/s3/file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ def gen_data(file_num, item_num_per_file):
'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()),
'test_date': pa.scalar(datetime.now().date(), type=pa.date32()),
'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')),
'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')),
'test_timestamp_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s')),
'test_timestamp_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms')),
'test_timestamp_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamp_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns')),
'test_timestamptz_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s', tz='+00:00')),
'test_timestamptz_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms', tz='+00:00')),
'test_timestamptz_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us', tz='+00:00')),
'test_timestamptz_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns', tz='+00:00')),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]
Expand Down Expand Up @@ -62,8 +68,15 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp,
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
test_timestamptz_ns timestamptz

) WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -130,8 +143,14 @@ def _table():
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand All @@ -148,7 +167,7 @@ def _table():
print('Sink into s3 in parquet encode...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_parquet_sink_table(
id bigint primary key,
id bigint primary key,\
name TEXT,
sex bigint,
mark bigint,
Expand All @@ -159,8 +178,14 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp,
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
test_timestamptz_ns timestamptz
) WITH (
connector = 's3',
match_pattern = 'test_parquet_sink/*.parquet',
Expand Down Expand Up @@ -198,8 +223,14 @@ def _table():
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
from {_table()} WITH (
connector = 'snowflake',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -227,8 +258,14 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp,
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
test_timestamptz_ns timestamptz
) WITH (
connector = 's3',
match_pattern = 'test_json_sink/*.json',
Expand Down
184 changes: 163 additions & 21 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

use std::fmt::Write;

use arrow_53_schema::TimeUnit;
use arrow_array::array;
use arrow_array::cast::AsArray;
use arrow_buffer::OffsetBuffer;
Expand Down Expand Up @@ -512,6 +513,12 @@ pub trait FromArrow {
Time64(Microsecond) => DataType::Time,
Timestamp(Microsecond, None) => DataType::Timestamp,
Timestamp(Microsecond, Some(_)) => DataType::Timestamptz,
Timestamp(Second, None) => DataType::Timestamp,
Timestamp(Second, Some(_)) => DataType::Timestamptz,
Timestamp(Millisecond, None) => DataType::Timestamp,
Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
Timestamp(Nanosecond, None) => DataType::Timestamp,
Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
Comment on lines 514 to +521
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamp(_, None) => DataType::Timestamp,
Timestamp(_, Some(_)) => DataType::Timestamptz,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer to keep time unit designated as a reminder, there are four types of timeunit that need to be handled. Previously, other types were ignored because of the use of _.

Copy link
Collaborator

@hzxa21 hzxa21 Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we use concrete type Microsecond instead of _ so it was ignored not because of the use of _ but the contrast. I don't have a strong opinion on this so listing the types explicitly here is also okay. I am more concerned about having separate logics to handle the different time unit separately given that our original implementation won't lose any precision. See my comment below.

Copy link
Contributor Author

@wcy-fdu wcy-fdu Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't lose any precision

explained in #19221 (comment)

Interval(MonthDayNano) => DataType::Interval,
Utf8 => DataType::Varchar,
Binary => DataType::Bytea,
Expand Down Expand Up @@ -572,7 +579,6 @@ pub trait FromArrow {
if let Some(type_name) = field.metadata().get("ARROW:extension:name") {
return self.from_extension_array(type_name, array);
}

match array.data_type() {
Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()),
Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()),
Expand All @@ -584,12 +590,30 @@ pub trait FromArrow {
Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
Timestamp(Second, None) => {
self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Second, Some(_)) => {
self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Millisecond, None) => {
self.from_timestampms_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Millisecond, Some(_)) => {
self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, None) => {
self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, Some(_)) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR but I wonder why we ignore the timezone (Some(_)) from arrow timestamp when constructing Timestamptz

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced in #17201

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we have encountered three issues regarding the conversion of the timestamp type:

  1. When converting Arrow's timestamp type to our timestamp, we should decide whether to convert it to timestamp or timestampz based on whether there is a timezone (none or some).
  2. When converting Arrow's timestamp type to our timestamp, only microseconds were converted, while the other three time units were not converted.
  3. In type comparisons, Arrow's timestamp(_, none) should match with rw’s timestamp, while timestamp(_, some) should match with rw’s timestamptz. However, previously, apart from microseconds, the other units did not match.

#17201 fix 1, and this pr fix 2 and 3.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. Correct me if I am wrong, I think 1 is not fixed completely because we don't use the correct timezone for timestamptz based on the arrow data type.

Copy link
Contributor Author

@wcy-fdu wcy-fdu Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, indeed this is another bug: during the conversion, the timezone should be handled using Arrow's timezone(), as we are currently using the default timezone. I will fix this in the next PR. Thanks to @hzxa21 for identifying this issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checked, their is no correctness issue:
When the timezone in Arrow is Some(_), the i64 always stores UTC time, which is consistent with PG/RW's timestamptz. Therefore, we only need to distinguish between None and Some(_), as the contents of Some do not affect the actual value; they are merely for display purposes.
refer to https://docs.rs/arrow-schema/53.2.0/arrow_schema/enum.DataType.html#timestamps-with-a-non-empty-timezone

self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Nanosecond, None) => {
self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Nanosecond, Some(_)) => {
self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
}
Interval(MonthDayNano) => {
self.from_interval_array(array.as_any().downcast_ref().unwrap())
}
Expand Down Expand Up @@ -692,6 +716,33 @@ pub trait FromArrow {
Ok(ArrayImpl::Time(array.into()))
}

fn from_timestampsecond_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}
fn from_timestampsecond_some_array(
&self,
array: &arrow_array::TimestampSecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_timestampms_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampms_some_array(
&self,
array: &arrow_array::TimestampMillisecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_timestampus_array(
&self,
array: &arrow_array::TimestampMicrosecondArray,
Expand All @@ -706,6 +757,20 @@ pub trait FromArrow {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_timestampns_array(
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampns_some_array(
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_interval_array(
&self,
array: &arrow_array::IntervalMonthDayNanoArray,
Expand Down Expand Up @@ -842,6 +907,44 @@ macro_rules! converts {
}
};
}

macro_rules! converts_with_timeunit {
($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => {

impl From<&$ArrayType> for $ArrowType {
fn from(array: &$ArrayType) -> Self {
array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect()
}
}

impl From<&$ArrowType> for $ArrayType {
fn from(array: &$ArrowType) -> Self {
array.iter().map(|o| {
o.map(|v| {
let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit);
timestamp
})
}).collect()
}
}

impl From<&[$ArrowType]> for $ArrayType {
fn from(arrays: &[$ArrowType]) -> Self {
arrays
.iter()
.flat_map(|a| a.iter())
.map(|o| {
o.map(|v| {
<<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit)
})
})
.collect()
}
}

};
}

converts!(BoolArray, arrow_array::BooleanArray);
converts!(I16Array, arrow_array::Int16Array);
converts!(I32Array, arrow_array::Int32Array);
Expand All @@ -854,11 +957,19 @@ converts!(Utf8Array, arrow_array::StringArray);
converts!(Utf8Array, arrow_array::LargeStringArray);
converts!(DateArray, arrow_array::Date32Array, @map);
converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map);
converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, @map);
converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, @map);
converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map);
converts!(SerialArray, arrow_array::Int64Array, @map);

converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);

converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map);
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);

/// Converts RisingWave value from and into Arrow value.
trait FromIntoArrow {
/// The corresponding element type in the Arrow array.
Expand All @@ -867,6 +978,16 @@ trait FromIntoArrow {
fn into_arrow(self) -> Self::ArrowType;
}

/// Converts RisingWave value from and into Arrow value.
/// Specifically used for converting timestamp types according to timeunit.
trait FromIntoArrowWithUnit {
type ArrowType;
/// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp.
type TimestampType;
fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self;
fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType;
}

impl FromIntoArrow for Serial {
type ArrowType = i64;

Expand Down Expand Up @@ -936,34 +1057,55 @@ impl FromIntoArrow for Time {
}
}

impl FromIntoArrow for Timestamp {
impl FromIntoArrowWithUnit for Timestamp {
type ArrowType = i64;
type TimestampType = TimeUnit;

fn from_arrow(value: Self::ArrowType) -> Self {
Timestamp(
DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _)
.unwrap()
.naive_utc(),
)
fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
match time_unit {
TimeUnit::Second => {
Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
}
TimeUnit::Millisecond => {
Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
}
TimeUnit::Microsecond => {
Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
}
TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()),
}
}

fn into_arrow(self) -> Self::ArrowType {
self.0
.signed_duration_since(NaiveDateTime::default())
.num_microseconds()
.unwrap()
fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
match time_unit {
TimeUnit::Second => self.0.and_utc().timestamp(),
TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(),
TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(),
TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(),
}
}
}

impl FromIntoArrow for Timestamptz {
impl FromIntoArrowWithUnit for Timestamptz {
type ArrowType = i64;

fn from_arrow(value: Self::ArrowType) -> Self {
Timestamptz::from_micros(value)
type TimestampType = TimeUnit;

fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
match time_unit {
TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(),
TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(),
TimeUnit::Microsecond => Timestamptz::from_micros(value),
TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(),
}
}

fn into_arrow(self) -> Self::ArrowType {
self.timestamp_micros()
fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType {
match time_unit {
TimeUnit::Second => self.timestamp(),
TimeUnit::Millisecond => self.timestamp_millis(),
TimeUnit::Microsecond => self.timestamp_micros(),
TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(),
}
}
}

Expand Down
Loading
Loading