From 2301faef66141747ef4838e7ff5d9962589c7fd2 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 31 Oct 2024 21:44:41 +0800 Subject: [PATCH 01/12] enhance parquet source and sink --- e2e_test/s3/fs_sink.py | 45 ++++----- src/common/src/array/arrow/arrow_impl.rs | 66 ++++++++++++- src/connector/src/parser/parquet_parser.rs | 29 ++---- .../opendal_source/opendal_reader.rs | 92 ++++++++++++++++++- 4 files changed, 184 insertions(+), 48 deletions(-) diff --git a/e2e_test/s3/fs_sink.py b/e2e_test/s3/fs_sink.py index 344b1b807d7e4..2769e24541ac4 100644 --- a/e2e_test/s3/fs_sink.py +++ b/e2e_test/s3/fs_sink.py @@ -18,17 +18,10 @@ def gen_data(file_num, item_num_per_file): [{ 'id': file_id * item_num_per_file + item_id, 'name': f'{file_id}_{item_id}_{file_id * item_num_per_file + item_id}', - 'sex': item_id % 2, - 'mark': (-1) ** (item_id % 2), - 'test_int': pa.scalar(1, type=pa.int32()), - 'test_real': pa.scalar(4.0, type=pa.float32()), - 'test_double_precision': pa.scalar(5.0, type=pa.float64()), - 'test_varchar': pa.scalar('7', type=pa.string()), - 'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()), - 'test_date': pa.scalar(datetime.now().date(), type=pa.date32()), - 'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')), - 'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')), - 'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')), + 'test_timestamp_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s')), + 'test_timestamp_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms')), + 'test_timestamp_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')), + 'test_timestamp_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns')), } for item_id in range(item_num_per_file)] for file_id in range(file_num) ] @@ -60,8 +53,10 @@ def _table(): test_bytea bytea, test_date date, test_time time, - test_timestamp timestamp, - test_timestamptz timestamptz, + test_timestamp_s timestamp, + test_timestamp_ms timestamp, + test_timestamp_us timestamp, + test_timestamp_ns timestamp ) WITH ( connector = 's3', match_pattern = '*.parquet', @@ -128,8 +123,10 @@ def _table(): test_bytea, test_date, test_time, - test_timestamp, - test_timestamptz + test_timestamp_s, + test_timestamp_ms, + test_timestamp_us, + test_timestamp_ns from {_table()} WITH ( connector = 's3', match_pattern = '*.parquet', @@ -158,8 +155,10 @@ def _table(): test_bytea bytea, test_date date, test_time time, - test_timestamp timestamp, - test_timestamptz timestamptz, + test_timestamp_s timestamp, + test_timestamp_ms timestamp, + test_timestamp_us timestamp, + test_timestamp_ns timestamp ) WITH ( connector = 's3', match_pattern = 'test_parquet_sink/*.parquet', @@ -196,8 +195,10 @@ def _table(): test_bytea, test_date, test_time, - test_timestamp, - test_timestamptz + test_timestamp_s, + test_timestamp_ms, + test_timestamp_us, + test_timestamp_ns from {_table()} WITH ( connector = 's3', match_pattern = '*.parquet', @@ -226,8 +227,10 @@ def _table(): test_bytea bytea, test_date date, test_time time, - test_timestamp timestamp, - test_timestamptz timestamptz, + test_timestamp_s timestamp, + test_timestamp_ms timestamp, + test_timestamp_us timestamp, + test_timestamp_ns timestamp ) WITH ( connector = 's3', match_pattern = 'test_json_sink/*.json', diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 3095461a2ebc5..c55e002e708ec 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -572,7 +572,6 @@ pub trait FromArrow { if let Some(type_name) = field.metadata().get("ARROW:extension:name") { return self.from_extension_array(type_name, array); } - match array.data_type() { Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()), Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()), @@ -584,12 +583,30 @@ pub trait FromArrow { Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()), Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()), Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()), + Timestamp(Second, None) => { + self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Second, Some(_)) => { + self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Millisecond, None) => { + self.from_timestampms_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Millisecond, Some(_)) => { + self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap()) + } Timestamp(Microsecond, None) => { self.from_timestampus_array(array.as_any().downcast_ref().unwrap()) } Timestamp(Microsecond, Some(_)) => { self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap()) } + Timestamp(Nanosecond, None) => { + self.from_timestampns_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Nanosecond, Some(_)) => { + self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap()) + } Interval(MonthDayNano) => { self.from_interval_array(array.as_any().downcast_ref().unwrap()) } @@ -692,6 +709,33 @@ pub trait FromArrow { Ok(ArrayImpl::Time(array.into())) } + fn from_timestampsecond_array( + &self, + array: &arrow_array::TimestampSecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamp(array.into())) + } + fn from_timestampsecond_some_array( + &self, + array: &arrow_array::TimestampSecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamp(array.into())) + } + + fn from_timestampms_array( + &self, + array: &arrow_array::TimestampMillisecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamp(array.into())) + } + + fn from_timestampms_some_array( + &self, + array: &arrow_array::TimestampMillisecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamptz(array.into())) + } + fn from_timestampus_array( &self, array: &arrow_array::TimestampMicrosecondArray, @@ -706,6 +750,20 @@ pub trait FromArrow { Ok(ArrayImpl::Timestamptz(array.into())) } + fn from_timestampns_array( + &self, + array: &arrow_array::TimestampNanosecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamp(array.into())) + } + + fn from_timestampns_some_array( + &self, + array: &arrow_array::TimestampNanosecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamptz(array.into())) + } + fn from_interval_array( &self, array: &arrow_array::IntervalMonthDayNanoArray, @@ -854,8 +912,14 @@ converts!(Utf8Array, arrow_array::StringArray); converts!(Utf8Array, arrow_array::LargeStringArray); converts!(DateArray, arrow_array::Date32Array, @map); converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map); +converts!(TimestampArray, arrow_array::TimestampSecondArray, @map); +converts!(TimestampArray, arrow_array::TimestampMillisecondArray, @map); converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, @map); +converts!(TimestampArray, arrow_array::TimestampNanosecondArray, @map); +converts!(TimestamptzArray, arrow_array::TimestampSecondArray, @map); +converts!(TimestamptzArray, arrow_array::TimestampMillisecondArray, @map); converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, @map); +converts!(TimestamptzArray, arrow_array::TimestampNanosecondArray, @map); converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map); converts!(SerialArray, arrow_array::Int64Array, @map); diff --git a/src/connector/src/parser/parquet_parser.rs b/src/connector/src/parser/parquet_parser.rs index db2ace3d2b6dd..f656555d7c119 100644 --- a/src/connector/src/parser/parquet_parser.rs +++ b/src/connector/src/parser/parquet_parser.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use deltalake::parquet::arrow::async_reader::AsyncFileReader; use futures_async_stream::try_stream; use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch; -use risingwave_common::array::arrow::{arrow_schema_iceberg, IcebergArrowConvert}; +use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk}; use risingwave_common::bail; use risingwave_common::types::{Datum, ScalarImpl}; @@ -104,32 +104,19 @@ impl ParquetParser { crate::source::SourceColumnType::Normal => { match source_column.is_hidden_addition_col { false => { - let rw_data_type = &source_column.data_type; + let rw_data_type: &risingwave_common::types::DataType = + &source_column.data_type; let rw_column_name = &source_column.name; + if let Some(parquet_column) = record_batch.column_by_name(rw_column_name) { let arrow_field = IcebergArrowConvert .to_arrow_field(rw_column_name, rw_data_type)?; - let converted_arrow_data_type: &arrow_schema_iceberg::DataType = - arrow_field.data_type(); - if converted_arrow_data_type == parquet_column.data_type() { - let array_impl = IcebergArrowConvert - .array_from_arrow_array(&arrow_field, parquet_column)?; - let column = Arc::new(array_impl); - chunk_columns.push(column); - } else { - // data type mismatch, this column is set to null. - let mut array_builder = ArrayBuilderImpl::with_type( - column_size, - rw_data_type.clone(), - ); - - array_builder.append_n_null(record_batch.num_rows()); - let res = array_builder.finish(); - let column = Arc::new(res); - chunk_columns.push(column); - } + let array_impl = IcebergArrowConvert + .array_from_arrow_array(&arrow_field, parquet_column)?; + let column = Arc::new(array_impl); + chunk_columns.push(column); } else { // For columns defined in the source schema but not present in the Parquet file, null values are filled in. let mut array_builder = diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 69308a092e2dd..38767be656eba 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -24,7 +24,7 @@ use opendal::Operator; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::FileMetaData; -use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::arrow::arrow_schema_iceberg; use risingwave_common::array::StreamChunk; use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use tokio::io::{AsyncRead, BufReader}; @@ -269,10 +269,10 @@ pub fn extract_valid_column_indices( .iter() .position(|&name| name == column.name) .and_then(|pos| { - let arrow_field = IcebergArrowConvert - .to_arrow_field(&column.name, &column.data_type) - .ok()?; - if &arrow_field == converted_arrow_schema.field(pos) { + if is_data_type_matching( + &column.data_type, + converted_arrow_schema.field(pos).data_type(), + ) { Some(pos) } else { None @@ -285,3 +285,85 @@ pub fn extract_valid_column_indices( None => Ok(vec![]), } } + +/// Checks if the data type in RisingWave matches the data type in a Parquet(arrow) file. +/// +/// This function compares the `DataType` from RisingWave with the `DataType` from +/// Parquet file, returning `true` if they are compatible. Specifically, for `Timestamp` +/// types, it ensures that any of the four `TimeUnit` variants from Parquet +/// (i.e., `Second`, `Millisecond`, `Microsecond`, and `Nanosecond`) can be matched +/// with the corresponding `Timestamp` type in RisingWave. +pub fn is_data_type_matching( + rw_data_type: &risingwave_common::types::DataType, + arrow_data_type: &arrow_schema_iceberg::DataType, +) -> bool { + match rw_data_type { + risingwave_common::types::DataType::Boolean => { + matches!(arrow_data_type, arrow_schema_iceberg::DataType::Boolean) + } + risingwave_common::types::DataType::Int16 => { + matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int16) + } + risingwave_common::types::DataType::Int32 => { + matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int32) + } + risingwave_common::types::DataType::Int64 => { + matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int64) + } + risingwave_common::types::DataType::Float32 => { + matches!(arrow_data_type, arrow_schema_iceberg::DataType::Float32) + } + risingwave_common::types::DataType::Float64 => { + matches!(arrow_data_type, arrow_schema_iceberg::DataType::Float64) + } + risingwave_common::types::DataType::Decimal => { + matches!( + arrow_data_type, + arrow_schema_iceberg::DataType::Decimal128(_, _) + ) || matches!( + arrow_data_type, + arrow_schema_iceberg::DataType::Decimal256(_, _) + ) + } + risingwave_common::types::DataType::Date => { + matches!(arrow_data_type, arrow_schema_iceberg::DataType::Date32) + || matches!(arrow_data_type, arrow_schema_iceberg::DataType::Date64) + } + risingwave_common::types::DataType::Varchar => { + matches!(arrow_data_type, arrow_schema_iceberg::DataType::Utf8) + } + risingwave_common::types::DataType::Time => { + matches!(arrow_data_type, arrow_schema_iceberg::DataType::Time32(_)) + || matches!(arrow_data_type, arrow_schema_iceberg::DataType::Time64(_)) + } + risingwave_common::types::DataType::Timestamp => { + matches!( + arrow_data_type, + arrow_schema_iceberg::DataType::Timestamp(_, _) + ) + } + risingwave_common::types::DataType::Timestamptz => { + matches!( + arrow_data_type, + arrow_schema_iceberg::DataType::Timestamp(_, _) + ) + } + risingwave_common::types::DataType::Interval => { + matches!(arrow_data_type, arrow_schema_iceberg::DataType::Interval(_)) + } + risingwave_common::types::DataType::List(inner_type) => { + if let arrow_schema_iceberg::DataType::List(field_ref) = arrow_data_type { + let inner_rw_type = inner_type.clone(); + let inner_arrow_type = field_ref.data_type(); + is_data_type_matching(&inner_rw_type, inner_arrow_type) + } else { + false + } + } + risingwave_common::types::DataType::Map(_) => { + // Directly return false for Map types + false + } + _ => false, // Handle other data types as necessary + } +} From c86a8cef641db6e1acd2b52e3216b13ad56bb421 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 1 Nov 2024 14:20:04 +0800 Subject: [PATCH 02/12] minor --- e2e_test/s3/fs_sink.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/e2e_test/s3/fs_sink.py b/e2e_test/s3/fs_sink.py index 2769e24541ac4..c5312647ba0d3 100644 --- a/e2e_test/s3/fs_sink.py +++ b/e2e_test/s3/fs_sink.py @@ -18,6 +18,15 @@ def gen_data(file_num, item_num_per_file): [{ 'id': file_id * item_num_per_file + item_id, 'name': f'{file_id}_{item_id}_{file_id * item_num_per_file + item_id}', + 'sex': item_id % 2, + 'mark': (-1) ** (item_id % 2), + 'test_int': pa.scalar(1, type=pa.int32()), + 'test_real': pa.scalar(4.0, type=pa.float32()), + 'test_double_precision': pa.scalar(5.0, type=pa.float64()), + 'test_varchar': pa.scalar('7', type=pa.string()), + 'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()), + 'test_date': pa.scalar(datetime.now().date(), type=pa.date32()), + 'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')), 'test_timestamp_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s')), 'test_timestamp_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms')), 'test_timestamp_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')), From a1d1a9b3e7bb59bf30551f184d1b8bdea08dd6cf Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Sun, 3 Nov 2024 16:48:37 +0800 Subject: [PATCH 03/12] handle Map type --- .../filesystem/opendal_source/opendal_reader.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 38767be656eba..575a644c05a45 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -360,8 +360,20 @@ pub fn is_data_type_matching( false } } - risingwave_common::types::DataType::Map(_) => { - // Directly return false for Map types + risingwave_common::types::DataType::Map(map_type) => { + if let arrow_schema_iceberg::DataType::Map(field_ref, _) = arrow_data_type { + let key_rw_type = map_type.key(); + let value_rw_type = map_type.value(); + let struct_type = field_ref.data_type(); + if let arrow_schema_iceberg::DataType::Struct(fields) = struct_type { + if fields.len() == 2 { + let key_arrow_type = fields[0].data_type(); + let value_arrow_type = fields[1].data_type(); + return is_data_type_matching(key_rw_type, key_arrow_type) + && is_data_type_matching(value_rw_type, value_arrow_type); + } + } + } false } _ => false, // Handle other data types as necessary From d804208cfbdeee8662f4f14b9adc17c8c9c5acef Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 5 Nov 2024 14:14:47 +0800 Subject: [PATCH 04/12] fix convert fro nanos, millis, secound --- e2e_test/s3/fs_sink.py | 37 +++++- src/common/src/array/arrow/arrow_impl.rs | 138 ++++++++++++++++++++--- src/common/src/types/timestamptz.rs | 5 + 3 files changed, 158 insertions(+), 22 deletions(-) diff --git a/e2e_test/s3/fs_sink.py b/e2e_test/s3/fs_sink.py index c5312647ba0d3..3971eefa91045 100644 --- a/e2e_test/s3/fs_sink.py +++ b/e2e_test/s3/fs_sink.py @@ -31,6 +31,10 @@ def gen_data(file_num, item_num_per_file): 'test_timestamp_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms')), 'test_timestamp_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')), 'test_timestamp_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns')), + 'test_timestamptz_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s', tz='+00:00')), + 'test_timestamptz_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms', tz='+00:00')), + 'test_timestamptz_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us', tz='+00:00')), + 'test_timestamptz_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns', tz='+00:00')), } for item_id in range(item_num_per_file)] for file_id in range(file_num) ] @@ -65,7 +69,12 @@ def _table(): test_timestamp_s timestamp, test_timestamp_ms timestamp, test_timestamp_us timestamp, - test_timestamp_ns timestamp + test_timestamp_ns timestamp, + test_timestamptz_s timestamptz, + test_timestamptz_ms timestamp, + test_timestamptz_us timestamp, + test_timestamptz_ns timestamp + ) WITH ( connector = 's3', match_pattern = '*.parquet', @@ -135,7 +144,11 @@ def _table(): test_timestamp_s, test_timestamp_ms, test_timestamp_us, - test_timestamp_ns + test_timestamp_ns, + test_timestamptz_s, + test_timestamptz_ms, + test_timestamptz_us, + test_timestamptz_ns from {_table()} WITH ( connector = 's3', match_pattern = '*.parquet', @@ -153,7 +166,7 @@ def _table(): print('Sink into s3 in parquet encode...') # Execute a SELECT statement cur.execute(f'''CREATE TABLE test_parquet_sink_table( - id bigint primary key, + id bigint primary key,\ name TEXT, sex bigint, mark bigint, @@ -167,7 +180,11 @@ def _table(): test_timestamp_s timestamp, test_timestamp_ms timestamp, test_timestamp_us timestamp, - test_timestamp_ns timestamp + test_timestamp_ns timestamp, + test_timestamptz_s timestamptz, + test_timestamptz_ms timestamp, + test_timestamptz_us timestamp, + test_timestamptz_ns timestamp ) WITH ( connector = 's3', match_pattern = 'test_parquet_sink/*.parquet', @@ -207,7 +224,11 @@ def _table(): test_timestamp_s, test_timestamp_ms, test_timestamp_us, - test_timestamp_ns + test_timestamp_ns, + test_timestamptz_s, + test_timestamptz_ms, + test_timestamptz_us, + test_timestamptz_ns from {_table()} WITH ( connector = 's3', match_pattern = '*.parquet', @@ -239,7 +260,11 @@ def _table(): test_timestamp_s timestamp, test_timestamp_ms timestamp, test_timestamp_us timestamp, - test_timestamp_ns timestamp + test_timestamp_ns timestamp, + test_timestamptz_s timestamptz, + test_timestamptz_ms timestamp, + test_timestamptz_us timestamp, + test_timestamptz_ns timestamp ) WITH ( connector = 's3', match_pattern = 'test_json_sink/*.json', diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index c55e002e708ec..f84582aa1b37e 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -42,6 +42,7 @@ use std::fmt::Write; +use arrow_53_schema::TimeUnit; use arrow_array::array; use arrow_array::cast::AsArray; use arrow_buffer::OffsetBuffer; @@ -848,6 +849,7 @@ impl From<&Bitmap> for arrow_buffer::NullBuffer { } /// Implement bi-directional `From` between concrete array types. + macro_rules! converts { ($ArrayType:ty, $ArrowType:ty) => { impl From<&$ArrayType> for $ArrowType { @@ -899,7 +901,43 @@ macro_rules! converts { } } }; + + ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => { + + impl From<&$ArrayType> for $ArrowType { + fn from(array: &$ArrayType) -> Self { + array.iter().map(|o| o.map(|v| v.into_arrow())).collect() + } + } + + impl From<&$ArrowType> for $ArrayType { + fn from(array: &$ArrowType) -> Self { + array.iter().map(|o| { + o.map(|v| { + let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow_with_unit(v, $time_unit); + timestamp + }) + }).collect() + } + } + + impl From<&[$ArrowType]> for $ArrayType { + fn from(arrays: &[$ArrowType]) -> Self { + arrays + .iter() + .flat_map(|a| a.iter()) + .map(|o| { + o.map(|v| { + <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v) + }) + }) + .collect() + } + } + + }; } + converts!(BoolArray, arrow_array::BooleanArray); converts!(I16Array, arrow_array::Int16Array); converts!(I32Array, arrow_array::Int32Array); @@ -912,14 +950,17 @@ converts!(Utf8Array, arrow_array::StringArray); converts!(Utf8Array, arrow_array::LargeStringArray); converts!(DateArray, arrow_array::Date32Array, @map); converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map); -converts!(TimestampArray, arrow_array::TimestampSecondArray, @map); -converts!(TimestampArray, arrow_array::TimestampMillisecondArray, @map); -converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, @map); -converts!(TimestampArray, arrow_array::TimestampNanosecondArray, @map); -converts!(TimestamptzArray, arrow_array::TimestampSecondArray, @map); -converts!(TimestamptzArray, arrow_array::TimestampMillisecondArray, @map); -converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, @map); -converts!(TimestamptzArray, arrow_array::TimestampNanosecondArray, @map); + +converts!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); +converts!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map); +converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); +converts!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); + +converts!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); +converts!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map); +converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); +converts!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); + converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map); converts!(SerialArray, arrow_array::Int64Array, @map); @@ -927,12 +968,15 @@ converts!(SerialArray, arrow_array::Int64Array, @map); trait FromIntoArrow { /// The corresponding element type in the Arrow array. type ArrowType; + type TimestampType: std::fmt::Debug; fn from_arrow(value: Self::ArrowType) -> Self; fn into_arrow(self) -> Self::ArrowType; + fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self; } impl FromIntoArrow for Serial { type ArrowType = i64; + type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { value.into() @@ -941,10 +985,15 @@ impl FromIntoArrow for Serial { fn into_arrow(self) -> Self::ArrowType { self.into() } + + fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { + unreachable!() + } } impl FromIntoArrow for F32 { type ArrowType = f32; + type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { value.into() @@ -953,10 +1002,15 @@ impl FromIntoArrow for F32 { fn into_arrow(self) -> Self::ArrowType { self.into() } + + fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { + unreachable!() + } } impl FromIntoArrow for F64 { type ArrowType = f64; + type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { value.into() @@ -965,10 +1019,15 @@ impl FromIntoArrow for F64 { fn into_arrow(self) -> Self::ArrowType { self.into() } + + fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { + unreachable!() + } } impl FromIntoArrow for Date { type ArrowType = i32; + type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { Date(arrow_array::types::Date32Type::to_naive_date(value)) @@ -977,10 +1036,15 @@ impl FromIntoArrow for Date { fn into_arrow(self) -> Self::ArrowType { arrow_array::types::Date32Type::from_naive_date(self.0) } + + fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { + unreachable!() + } } impl FromIntoArrow for Time { type ArrowType = i64; + type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { Time( @@ -998,17 +1062,18 @@ impl FromIntoArrow for Time { .num_microseconds() .unwrap() } + + fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { + unreachable!() + } } impl FromIntoArrow for Timestamp { type ArrowType = i64; + type TimestampType = TimeUnit; - fn from_arrow(value: Self::ArrowType) -> Self { - Timestamp( - DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _) - .unwrap() - .naive_utc(), - ) + fn from_arrow(_value: Self::ArrowType) -> Self { + unreachable!() } fn into_arrow(self) -> Self::ArrowType { @@ -1017,22 +1082,59 @@ impl FromIntoArrow for Timestamp { .num_microseconds() .unwrap() } + + fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { + match time_unit { + TimeUnit::Nanosecond => Timestamp( + DateTime::from_timestamp( + (value / 1_000_000_000) as _, + (value % 1_000_000_000) as _, + ) + .unwrap() + .naive_utc(), + ), + TimeUnit::Microsecond => Timestamp( + DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000) as _) + .unwrap() + .naive_utc(), + ), + TimeUnit::Millisecond => Timestamp( + DateTime::from_timestamp((value / 1_000) as _, (value % 1_000 * 1_000) as _) + .unwrap() + .naive_utc(), + ), + TimeUnit::Second => { + Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc()) + } + } + } } impl FromIntoArrow for Timestamptz { type ArrowType = i64; + type TimestampType = TimeUnit; - fn from_arrow(value: Self::ArrowType) -> Self { - Timestamptz::from_micros(value) + fn from_arrow(_value: Self::ArrowType) -> Self { + unreachable!() } fn into_arrow(self) -> Self::ArrowType { self.timestamp_micros() } + + fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { + match time_unit { + TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(), + TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(), + TimeUnit::Microsecond => Timestamptz::from_micros(value), + TimeUnit::Nanosecond => Timestamptz::from_nans(value).unwrap_or_default(), + } + } } impl FromIntoArrow for Interval { type ArrowType = ArrowIntervalType; + type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { ::to_interval(value) @@ -1041,6 +1143,10 @@ impl FromIntoArrow for Interval { fn into_arrow(self) -> Self::ArrowType { ::from_interval(self) } + + fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { + unreachable!() + } } impl From<&DecimalArray> for arrow_array::LargeBinaryArray { diff --git a/src/common/src/types/timestamptz.rs b/src/common/src/types/timestamptz.rs index 34a6bd2465271..6d322eea5e172 100644 --- a/src/common/src/types/timestamptz.rs +++ b/src/common/src/types/timestamptz.rs @@ -102,6 +102,11 @@ impl Timestamptz { Self(timestamp_micros) } + /// Creates a `Timestamptz` from microseconds. + pub fn from_nans(timestamp_nanos: i64) -> Option { + timestamp_nanos.checked_div(1_000).map(Self) + } + /// Returns the number of non-leap-microseconds since January 1, 1970 UTC. pub fn timestamp_micros(&self) -> i64 { self.0 From 01d102cd050af4de9b98f9a452a3b61bf30bed1a Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 5 Nov 2024 14:41:34 +0800 Subject: [PATCH 05/12] fix Millisecond --- e2e_test/s3/fs_sink.py | 18 +-- src/common/src/array/arrow/arrow_impl.rs | 32 +++--- .../opendal_source/opendal_reader.rs | 106 ++---------------- 3 files changed, 35 insertions(+), 121 deletions(-) diff --git a/e2e_test/s3/fs_sink.py b/e2e_test/s3/fs_sink.py index 3971eefa91045..0e91eab33ac2c 100644 --- a/e2e_test/s3/fs_sink.py +++ b/e2e_test/s3/fs_sink.py @@ -71,9 +71,9 @@ def _table(): test_timestamp_us timestamp, test_timestamp_ns timestamp, test_timestamptz_s timestamptz, - test_timestamptz_ms timestamp, - test_timestamptz_us timestamp, - test_timestamptz_ns timestamp + test_timestamptz_ms timestamptz, + test_timestamptz_us timestamptz, + test_timestamptz_ns timestamptz ) WITH ( connector = 's3', @@ -182,9 +182,9 @@ def _table(): test_timestamp_us timestamp, test_timestamp_ns timestamp, test_timestamptz_s timestamptz, - test_timestamptz_ms timestamp, - test_timestamptz_us timestamp, - test_timestamptz_ns timestamp + test_timestamptz_ms timestamptz, + test_timestamptz_us timestamptz, + test_timestamptz_ns timestamptz ) WITH ( connector = 's3', match_pattern = 'test_parquet_sink/*.parquet', @@ -262,9 +262,9 @@ def _table(): test_timestamp_us timestamp, test_timestamp_ns timestamp, test_timestamptz_s timestamptz, - test_timestamptz_ms timestamp, - test_timestamptz_us timestamp, - test_timestamptz_ns timestamp + test_timestamptz_ms timestamptz, + test_timestamptz_us timestamptz, + test_timestamptz_ns timestamptz ) WITH ( connector = 's3', match_pattern = 'test_json_sink/*.json', diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index f84582aa1b37e..07a5eaae2db71 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -513,6 +513,12 @@ pub trait FromArrow { Time64(Microsecond) => DataType::Time, Timestamp(Microsecond, None) => DataType::Timestamp, Timestamp(Microsecond, Some(_)) => DataType::Timestamptz, + Timestamp(Second, None) => DataType::Timestamp, + Timestamp(Second, Some(_)) => DataType::Timestamptz, + Timestamp(Millisecond, None) => DataType::Timestamp, + Timestamp(Millisecond, Some(_)) => DataType::Timestamptz, + Timestamp(Nanosecond, None) => DataType::Timestamp, + Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz, Interval(MonthDayNano) => DataType::Interval, Utf8 => DataType::Varchar, Binary => DataType::Bytea, @@ -1085,6 +1091,19 @@ impl FromIntoArrow for Timestamp { fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { match time_unit { + TimeUnit::Second => { + Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc()) + } + TimeUnit::Millisecond => Timestamp( + DateTime::from_timestamp((value / 1_000) as _, (value % 1_000 * 1000000) as _) + .unwrap() + .naive_utc(), + ), + TimeUnit::Microsecond => Timestamp( + DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _) + .unwrap() + .naive_utc(), + ), TimeUnit::Nanosecond => Timestamp( DateTime::from_timestamp( (value / 1_000_000_000) as _, @@ -1093,19 +1112,6 @@ impl FromIntoArrow for Timestamp { .unwrap() .naive_utc(), ), - TimeUnit::Microsecond => Timestamp( - DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000) as _) - .unwrap() - .naive_utc(), - ), - TimeUnit::Millisecond => Timestamp( - DateTime::from_timestamp((value / 1_000) as _, (value % 1_000 * 1_000) as _) - .unwrap() - .naive_utc(), - ), - TimeUnit::Second => { - Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc()) - } } } } diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 575a644c05a45..14258d8924658 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -24,7 +24,7 @@ use opendal::Operator; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::FileMetaData; -use risingwave_common::array::arrow::arrow_schema_iceberg; +use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::StreamChunk; use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use tokio::io::{AsyncRead, BufReader}; @@ -269,10 +269,12 @@ pub fn extract_valid_column_indices( .iter() .position(|&name| name == column.name) .and_then(|pos| { - if is_data_type_matching( - &column.data_type, - converted_arrow_schema.field(pos).data_type(), - ) { + // We should convert Arrow field to the rw data type instead of converting the rw data type to the Arrow data type for comparison. + // The reason is that for the timestamp type, the different time units in Arrow need to match with the timestamp and timestamptz in rw. + let arrow_filed_to_rw_data_type = IcebergArrowConvert + .type_from_field(converted_arrow_schema.field(pos)) + .ok()?; + if arrow_filed_to_rw_data_type == column.data_type { Some(pos) } else { None @@ -285,97 +287,3 @@ pub fn extract_valid_column_indices( None => Ok(vec![]), } } - -/// Checks if the data type in RisingWave matches the data type in a Parquet(arrow) file. -/// -/// This function compares the `DataType` from RisingWave with the `DataType` from -/// Parquet file, returning `true` if they are compatible. Specifically, for `Timestamp` -/// types, it ensures that any of the four `TimeUnit` variants from Parquet -/// (i.e., `Second`, `Millisecond`, `Microsecond`, and `Nanosecond`) can be matched -/// with the corresponding `Timestamp` type in RisingWave. -pub fn is_data_type_matching( - rw_data_type: &risingwave_common::types::DataType, - arrow_data_type: &arrow_schema_iceberg::DataType, -) -> bool { - match rw_data_type { - risingwave_common::types::DataType::Boolean => { - matches!(arrow_data_type, arrow_schema_iceberg::DataType::Boolean) - } - risingwave_common::types::DataType::Int16 => { - matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int16) - } - risingwave_common::types::DataType::Int32 => { - matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int32) - } - risingwave_common::types::DataType::Int64 => { - matches!(arrow_data_type, arrow_schema_iceberg::DataType::Int64) - } - risingwave_common::types::DataType::Float32 => { - matches!(arrow_data_type, arrow_schema_iceberg::DataType::Float32) - } - risingwave_common::types::DataType::Float64 => { - matches!(arrow_data_type, arrow_schema_iceberg::DataType::Float64) - } - risingwave_common::types::DataType::Decimal => { - matches!( - arrow_data_type, - arrow_schema_iceberg::DataType::Decimal128(_, _) - ) || matches!( - arrow_data_type, - arrow_schema_iceberg::DataType::Decimal256(_, _) - ) - } - risingwave_common::types::DataType::Date => { - matches!(arrow_data_type, arrow_schema_iceberg::DataType::Date32) - || matches!(arrow_data_type, arrow_schema_iceberg::DataType::Date64) - } - risingwave_common::types::DataType::Varchar => { - matches!(arrow_data_type, arrow_schema_iceberg::DataType::Utf8) - } - risingwave_common::types::DataType::Time => { - matches!(arrow_data_type, arrow_schema_iceberg::DataType::Time32(_)) - || matches!(arrow_data_type, arrow_schema_iceberg::DataType::Time64(_)) - } - risingwave_common::types::DataType::Timestamp => { - matches!( - arrow_data_type, - arrow_schema_iceberg::DataType::Timestamp(_, _) - ) - } - risingwave_common::types::DataType::Timestamptz => { - matches!( - arrow_data_type, - arrow_schema_iceberg::DataType::Timestamp(_, _) - ) - } - risingwave_common::types::DataType::Interval => { - matches!(arrow_data_type, arrow_schema_iceberg::DataType::Interval(_)) - } - risingwave_common::types::DataType::List(inner_type) => { - if let arrow_schema_iceberg::DataType::List(field_ref) = arrow_data_type { - let inner_rw_type = inner_type.clone(); - let inner_arrow_type = field_ref.data_type(); - is_data_type_matching(&inner_rw_type, inner_arrow_type) - } else { - false - } - } - risingwave_common::types::DataType::Map(map_type) => { - if let arrow_schema_iceberg::DataType::Map(field_ref, _) = arrow_data_type { - let key_rw_type = map_type.key(); - let value_rw_type = map_type.value(); - let struct_type = field_ref.data_type(); - if let arrow_schema_iceberg::DataType::Struct(fields) = struct_type { - if fields.len() == 2 { - let key_arrow_type = fields[0].data_type(); - let value_arrow_type = fields[1].data_type(); - return is_data_type_matching(key_rw_type, key_arrow_type) - && is_data_type_matching(value_rw_type, value_arrow_type); - } - } - } - false - } - _ => false, // Handle other data types as necessary - } -} From 7c92a3f3af90f9044df0dd63795ee9421af5e346 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 5 Nov 2024 14:47:16 +0800 Subject: [PATCH 06/12] fmt --- e2e_test/s3/fs_sink.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/s3/fs_sink.py b/e2e_test/s3/fs_sink.py index 0e91eab33ac2c..b068b479d8f83 100644 --- a/e2e_test/s3/fs_sink.py +++ b/e2e_test/s3/fs_sink.py @@ -74,7 +74,7 @@ def _table(): test_timestamptz_ms timestamptz, test_timestamptz_us timestamptz, test_timestamptz_ns timestamptz - + ) WITH ( connector = 's3', match_pattern = '*.parquet', From 61e79a1d8aaf48345452936d86bdb0fe12f77d6a Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 5 Nov 2024 15:01:16 +0800 Subject: [PATCH 07/12] add some comments --- src/common/src/array/arrow/arrow_impl.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 07a5eaae2db71..388457068d9f4 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -974,9 +974,11 @@ converts!(SerialArray, arrow_array::Int64Array, @map); trait FromIntoArrow { /// The corresponding element type in the Arrow array. type ArrowType; - type TimestampType: std::fmt::Debug; + /// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp. + type TimestampType; fn from_arrow(value: Self::ArrowType) -> Self; fn into_arrow(self) -> Self::ArrowType; + /// Used for converting timestamp types and will not be used in conversions of other types. fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self; } From 69c8320b4ecbd01e01ee2d4b613bfd13b5173e73 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 7 Nov 2024 16:10:19 +0800 Subject: [PATCH 08/12] add into_arrow_with_unit --- src/common/src/array/arrow/arrow_impl.rs | 62 +++++++++++++++++++++++- src/common/src/types/timestamptz.rs | 7 ++- 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 388457068d9f4..1fe5cfc07c908 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -912,7 +912,7 @@ macro_rules! converts { impl From<&$ArrayType> for $ArrowType { fn from(array: &$ArrayType) -> Self { - array.iter().map(|o| o.map(|v| v.into_arrow())).collect() + array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect() } } @@ -980,6 +980,8 @@ trait FromIntoArrow { fn into_arrow(self) -> Self::ArrowType; /// Used for converting timestamp types and will not be used in conversions of other types. fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self; + /// Used for converting timestamp types and will not be used in conversions of other types. + fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType; } impl FromIntoArrow for Serial { @@ -997,6 +999,10 @@ impl FromIntoArrow for Serial { fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { unreachable!() } + + fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { + unreachable!() + } } impl FromIntoArrow for F32 { @@ -1014,6 +1020,10 @@ impl FromIntoArrow for F32 { fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { unreachable!() } + + fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { + unreachable!() + } } impl FromIntoArrow for F64 { @@ -1031,6 +1041,10 @@ impl FromIntoArrow for F64 { fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { unreachable!() } + + fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { + unreachable!() + } } impl FromIntoArrow for Date { @@ -1048,6 +1062,10 @@ impl FromIntoArrow for Date { fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { unreachable!() } + + fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { + unreachable!() + } } impl FromIntoArrow for Time { @@ -1074,6 +1092,10 @@ impl FromIntoArrow for Time { fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { unreachable!() } + + fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { + unreachable!() + } } impl FromIntoArrow for Timestamp { @@ -1116,6 +1138,29 @@ impl FromIntoArrow for Timestamp { ), } } + + fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType { + match time_unit { + TimeUnit::Second => self + .0 + .signed_duration_since(NaiveDateTime::default()) + .num_seconds(), + TimeUnit::Millisecond => self + .0 + .signed_duration_since(NaiveDateTime::default()) + .num_milliseconds(), + TimeUnit::Microsecond => self + .0 + .signed_duration_since(NaiveDateTime::default()) + .num_microseconds() + .unwrap(), + TimeUnit::Nanosecond => self + .0 + .signed_duration_since(NaiveDateTime::default()) + .num_nanoseconds() + .unwrap(), + } + } } impl FromIntoArrow for Timestamptz { @@ -1135,7 +1180,16 @@ impl FromIntoArrow for Timestamptz { TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(), TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(), TimeUnit::Microsecond => Timestamptz::from_micros(value), - TimeUnit::Nanosecond => Timestamptz::from_nans(value).unwrap_or_default(), + TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(), + } + } + + fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType { + match time_unit { + TimeUnit::Second => self.timestamp(), + TimeUnit::Millisecond => self.timestamp_millis(), + TimeUnit::Microsecond => self.timestamp_micros(), + TimeUnit::Nanosecond => self.timestamp_nanos().unwrap_or_default(), } } } @@ -1155,6 +1209,10 @@ impl FromIntoArrow for Interval { fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { unreachable!() } + + fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { + unreachable!() + } } impl From<&DecimalArray> for arrow_array::LargeBinaryArray { diff --git a/src/common/src/types/timestamptz.rs b/src/common/src/types/timestamptz.rs index 6d322eea5e172..11ea22154e97d 100644 --- a/src/common/src/types/timestamptz.rs +++ b/src/common/src/types/timestamptz.rs @@ -103,7 +103,7 @@ impl Timestamptz { } /// Creates a `Timestamptz` from microseconds. - pub fn from_nans(timestamp_nanos: i64) -> Option { + pub fn from_nanos(timestamp_nanos: i64) -> Option { timestamp_nanos.checked_div(1_000).map(Self) } @@ -117,6 +117,11 @@ impl Timestamptz { self.0.div_euclid(1_000) } + /// Returns the number of non-leap-nanosseconds since January 1, 1970 UTC. + pub fn timestamp_nanos(&self) -> Option { + self.0.checked_mul(1_000) + } + /// Returns the number of non-leap seconds since January 1, 1970 0:00:00 UTC (aka "UNIX /// timestamp"). pub fn timestamp(&self) -> i64 { From 857d86d9043a0cb6fd15a66e6f0bf7d93b5290df Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 7 Nov 2024 16:20:41 +0800 Subject: [PATCH 09/12] minor --- src/common/src/array/arrow/arrow_impl.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 1fe5cfc07c908..f17adc694029e 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -980,7 +980,8 @@ trait FromIntoArrow { fn into_arrow(self) -> Self::ArrowType; /// Used for converting timestamp types and will not be used in conversions of other types. fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self; - /// Used for converting timestamp types and will not be used in conversions of other types. + /// Used for converting rw timestamp types to arrow timestamp type. + /// In actual calls, due to compatibility, it will only be converted to microsecond. fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType; } From 8d033f795853d186f1b5db8d71892f2f6c903422 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 11 Nov 2024 17:25:25 +0800 Subject: [PATCH 10/12] split into two macros and trait --- src/common/src/array/arrow/arrow_impl.rs | 112 +++++------------------ 1 file changed, 22 insertions(+), 90 deletions(-) diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index f17adc694029e..8c752a1820436 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -907,7 +907,9 @@ macro_rules! converts { } } }; +} +macro_rules! converts_with_timeunit { ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => { impl From<&$ArrayType> for $ArrowType { @@ -920,7 +922,7 @@ macro_rules! converts { fn from(array: &$ArrowType) -> Self { array.iter().map(|o| { o.map(|v| { - let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow_with_unit(v, $time_unit); + let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit); timestamp }) }).collect() @@ -934,7 +936,7 @@ macro_rules! converts { .flat_map(|a| a.iter()) .map(|o| { o.map(|v| { - <<$ArrayType as Array>::RefItem<'_> as FromIntoArrow>::from_arrow(v) + <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit) }) }) .collect() @@ -957,15 +959,15 @@ converts!(Utf8Array, arrow_array::LargeStringArray); converts!(DateArray, arrow_array::Date32Array, @map); converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map); -converts!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); -converts!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map); -converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); -converts!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); -converts!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); -converts!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map); -converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); -converts!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map); +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map); converts!(SerialArray, arrow_array::Int64Array, @map); @@ -974,20 +976,22 @@ converts!(SerialArray, arrow_array::Int64Array, @map); trait FromIntoArrow { /// The corresponding element type in the Arrow array. type ArrowType; - /// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp. - type TimestampType; fn from_arrow(value: Self::ArrowType) -> Self; fn into_arrow(self) -> Self::ArrowType; - /// Used for converting timestamp types and will not be used in conversions of other types. +} + +/// Converts RisingWave value from and into Arrow value. +/// Specifically used for converting timestamp types according to timeunit. +trait FromIntoArrowWithUnit { + type ArrowType; + /// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp. + type TimestampType; fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self; - /// Used for converting rw timestamp types to arrow timestamp type. - /// In actual calls, due to compatibility, it will only be converted to microsecond. fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType; } impl FromIntoArrow for Serial { type ArrowType = i64; - type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { value.into() @@ -996,19 +1000,10 @@ impl FromIntoArrow for Serial { fn into_arrow(self) -> Self::ArrowType { self.into() } - - fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { - unreachable!() - } - - fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { - unreachable!() - } } impl FromIntoArrow for F32 { type ArrowType = f32; - type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { value.into() @@ -1017,19 +1012,10 @@ impl FromIntoArrow for F32 { fn into_arrow(self) -> Self::ArrowType { self.into() } - - fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { - unreachable!() - } - - fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { - unreachable!() - } } impl FromIntoArrow for F64 { type ArrowType = f64; - type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { value.into() @@ -1038,19 +1024,10 @@ impl FromIntoArrow for F64 { fn into_arrow(self) -> Self::ArrowType { self.into() } - - fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { - unreachable!() - } - - fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { - unreachable!() - } } impl FromIntoArrow for Date { type ArrowType = i32; - type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { Date(arrow_array::types::Date32Type::to_naive_date(value)) @@ -1059,19 +1036,10 @@ impl FromIntoArrow for Date { fn into_arrow(self) -> Self::ArrowType { arrow_array::types::Date32Type::from_naive_date(self.0) } - - fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { - unreachable!() - } - - fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { - unreachable!() - } } impl FromIntoArrow for Time { type ArrowType = i64; - type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { Time( @@ -1089,31 +1057,12 @@ impl FromIntoArrow for Time { .num_microseconds() .unwrap() } - - fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { - unreachable!() - } - - fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { - unreachable!() - } } -impl FromIntoArrow for Timestamp { +impl FromIntoArrowWithUnit for Timestamp { type ArrowType = i64; type TimestampType = TimeUnit; - fn from_arrow(_value: Self::ArrowType) -> Self { - unreachable!() - } - - fn into_arrow(self) -> Self::ArrowType { - self.0 - .signed_duration_since(NaiveDateTime::default()) - .num_microseconds() - .unwrap() - } - fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { match time_unit { TimeUnit::Second => { @@ -1164,18 +1113,10 @@ impl FromIntoArrow for Timestamp { } } -impl FromIntoArrow for Timestamptz { +impl FromIntoArrowWithUnit for Timestamptz { type ArrowType = i64; type TimestampType = TimeUnit; - fn from_arrow(_value: Self::ArrowType) -> Self { - unreachable!() - } - - fn into_arrow(self) -> Self::ArrowType { - self.timestamp_micros() - } - fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { match time_unit { TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(), @@ -1197,7 +1138,6 @@ impl FromIntoArrow for Timestamptz { impl FromIntoArrow for Interval { type ArrowType = ArrowIntervalType; - type TimestampType = (); fn from_arrow(value: Self::ArrowType) -> Self { ::to_interval(value) @@ -1206,14 +1146,6 @@ impl FromIntoArrow for Interval { fn into_arrow(self) -> Self::ArrowType { ::from_interval(self) } - - fn from_arrow_with_unit(_value: Self::ArrowType, _time_unit: Self::TimestampType) -> Self { - unreachable!() - } - - fn into_arrow_with_unit(self, _time_unit: Self::TimestampType) -> Self::ArrowType { - unreachable!() - } } impl From<&DecimalArray> for arrow_array::LargeBinaryArray { From c83f421ccc80f91e75dccc49b762573178d7e81d Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 12 Nov 2024 10:03:52 +0800 Subject: [PATCH 11/12] minor --- src/common/src/array/arrow/arrow_impl.rs | 32 ++++++++---------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 8c752a1820436..671f66a4eb849 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -726,7 +726,7 @@ pub trait FromArrow { &self, array: &arrow_array::TimestampSecondArray, ) -> Result { - Ok(ArrayImpl::Timestamp(array.into())) + Ok(ArrayImpl::Timestamptz(array.into())) } fn from_timestampms_array( @@ -958,6 +958,8 @@ converts!(Utf8Array, arrow_array::StringArray); converts!(Utf8Array, arrow_array::LargeStringArray); converts!(DateArray, arrow_array::Date32Array, @map); converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map); +converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map); +converts!(SerialArray, arrow_array::Int64Array, @map); converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map); @@ -969,9 +971,6 @@ converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); -converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map); -converts!(SerialArray, arrow_array::Int64Array, @map); - /// Converts RisingWave value from and into Arrow value. trait FromIntoArrow { /// The corresponding element type in the Arrow array. @@ -1068,24 +1067,13 @@ impl FromIntoArrowWithUnit for Timestamp { TimeUnit::Second => { Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc()) } - TimeUnit::Millisecond => Timestamp( - DateTime::from_timestamp((value / 1_000) as _, (value % 1_000 * 1000000) as _) - .unwrap() - .naive_utc(), - ), - TimeUnit::Microsecond => Timestamp( - DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _) - .unwrap() - .naive_utc(), - ), - TimeUnit::Nanosecond => Timestamp( - DateTime::from_timestamp( - (value / 1_000_000_000) as _, - (value % 1_000_000_000) as _, - ) - .unwrap() - .naive_utc(), - ), + TimeUnit::Millisecond => { + Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc()) + } + TimeUnit::Microsecond => { + Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc()) + } + TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()), } } From 4006d13d391dd7f4bced5f0e4054c92d115119d6 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 12 Nov 2024 11:03:14 +0800 Subject: [PATCH 12/12] minor --- src/common/src/array/arrow/arrow_impl.rs | 25 +++++------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 671f66a4eb849..2e35a1403781f 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -855,7 +855,6 @@ impl From<&Bitmap> for arrow_buffer::NullBuffer { } /// Implement bi-directional `From` between concrete array types. - macro_rules! converts { ($ArrayType:ty, $ArrowType:ty) => { impl From<&$ArrayType> for $ArrowType { @@ -1079,24 +1078,10 @@ impl FromIntoArrowWithUnit for Timestamp { fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType { match time_unit { - TimeUnit::Second => self - .0 - .signed_duration_since(NaiveDateTime::default()) - .num_seconds(), - TimeUnit::Millisecond => self - .0 - .signed_duration_since(NaiveDateTime::default()) - .num_milliseconds(), - TimeUnit::Microsecond => self - .0 - .signed_duration_since(NaiveDateTime::default()) - .num_microseconds() - .unwrap(), - TimeUnit::Nanosecond => self - .0 - .signed_duration_since(NaiveDateTime::default()) - .num_nanoseconds() - .unwrap(), + TimeUnit::Second => self.0.and_utc().timestamp(), + TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(), + TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(), + TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(), } } } @@ -1119,7 +1104,7 @@ impl FromIntoArrowWithUnit for Timestamptz { TimeUnit::Second => self.timestamp(), TimeUnit::Millisecond => self.timestamp_millis(), TimeUnit::Microsecond => self.timestamp_micros(), - TimeUnit::Nanosecond => self.timestamp_nanos().unwrap_or_default(), + TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(), } } }