Skip to content

Commit

Permalink
feat(connector): introduce parquet file source (#17201)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Jul 12, 2024
1 parent d06bb47 commit 102a60d
Show file tree
Hide file tree
Showing 16 changed files with 446 additions and 33 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/s3-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ echo "--- starting risingwave cluster with connector node"
risedev ci-start ci-1cn-1fe

echo "--- Run test"
python3 -m pip install --break-system-packages minio psycopg2-binary opendal
python3 -m pip install --break-system-packages minio psycopg2-binary opendal pandas
if [[ -v format_type ]]; then
python3 e2e_test/s3/"$script" "$format_type"
else
Expand Down
22 changes: 22 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,28 @@ steps:
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source check on parquet file"
key: "s3-v2-source-check-parquet-file"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source.py"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-s3-source-tests"
|| build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
retry: *auto-retry

- label: "S3_v2 source batch read on AWS (json parser)"
key: "s3-v2-source-batch-read-check-aws-json-parser"
command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_batch.py -t json"
Expand Down
137 changes: 137 additions & 0 deletions e2e_test/s3/fs_parquet_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
import sys
import random
import psycopg2
import json
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from datetime import datetime, timezone
from time import sleep
from minio import Minio
from random import uniform

def gen_data(file_num, item_num_per_file):
assert item_num_per_file % 2 == 0, \
f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}'
return [
[{
'id': file_id * item_num_per_file + item_id,
'name': f'{file_id}_{item_id}_{file_id * item_num_per_file + item_id}',
'sex': item_id % 2,
'mark': (-1) ** (item_id % 2),
'test_int': pa.scalar(1, type=pa.int32()),
'test_real': pa.scalar(4.0, type=pa.float32()),
'test_double_precision': pa.scalar(5.0, type=pa.float64()),
'test_varchar': pa.scalar('7', type=pa.string()),
'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()),
'test_date': pa.scalar(datetime.now().date(), type=pa.date32()),
'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')),
'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]

def do_test(config, file_num, item_num_per_file, prefix):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()

def _table():
return 's3_test_parquet'

# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
id bigint primary key,
name TEXT,
sex bigint,
mark bigint,
test_int int,
test_real real,
test_double_precision double precision,
test_varchar varchar,
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
) WITH (
connector = 's3_v2',
match_pattern = '*.parquet',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
) FORMAT PLAIN ENCODE PARQUET;''')

total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
cur.execute(f'select count(*) from {_table()}')
result = cur.fetchone()
if result[0] == total_rows:
break
print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s")
sleep(10)

stmt = f'select count(*), sum(id) from {_table()}'
print(f'Execute {stmt}')
cur.execute(stmt)
result = cur.fetchone()

print('Got:', result)

def _assert_eq(field, got, expect):
assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.'

_assert_eq('count(*)', result[0], total_rows)
_assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2)

print('Test pass')

cur.execute(f'drop table {_table()}')
cur.close()
conn.close()


if __name__ == "__main__":
FILE_NUM = 10
ITEM_NUM_PER_FILE = 2000
data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE)

config = json.loads(os.environ["S3_SOURCE_TEST_CONF"])
client = Minio(
config["S3_ENDPOINT"],
access_key=config["S3_ACCESS_KEY"],
secret_key=config["S3_SECRET_KEY"],
secure=True,
)
run_id = str(random.randint(1000, 9999))
_local = lambda idx: f'data_{idx}.parquet'
_s3 = lambda idx: f"{run_id}_data_{idx}.parquet"

# put s3 files
for idx, file_data in enumerate(data):
table = pa.Table.from_pandas(pd.DataFrame(file_data))
pq.write_table(table, _local(idx))

client.fput_object(
config["S3_BUCKET"],
_s3(idx),
_local(idx)
)

# do test
do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id)

# clean up s3 files
for idx, _ in enumerate(data):
client.remove_object(config["S3_BUCKET"], _s3(idx))
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ enum EncodeType {
ENCODE_TYPE_TEMPLATE = 7;
ENCODE_TYPE_NONE = 8;
ENCODE_TYPE_TEXT = 9;
ENCODE_TYPE_PARQUET = 10;
}

enum RowFormatType {
Expand Down
12 changes: 11 additions & 1 deletion src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,12 @@ pub trait FromArrow {
Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()),
Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()),
Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()),
Timestamp(Microsecond, _) => {
Timestamp(Microsecond, None) => {
self.from_timestampus_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Microsecond, Some(_)) => {
self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
}
Interval(MonthDayNano) => {
self.from_interval_array(array.as_any().downcast_ref().unwrap())
}
Expand Down Expand Up @@ -628,6 +631,13 @@ pub trait FromArrow {
Ok(ArrayImpl::Timestamp(array.into()))
}

fn from_timestampus_some_array(
&self,
array: &arrow_array::TimestampMicrosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_interval_array(
&self,
array: &arrow_array::IntervalMonthDayNanoArray,
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::ArrayError;
use risingwave_common::error::def_anyhow_newtype;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::RpcError;
Expand Down Expand Up @@ -41,17 +42,20 @@ def_anyhow_newtype! {
url::ParseError => "failed to parse url",
serde_json::Error => "failed to parse json",
csv::Error => "failed to parse csv",

uuid::Error => transparent, // believed to be self-explanatory

// Connector errors
opendal::Error => transparent, // believed to be self-explanatory

parquet::errors::ParquetError => transparent,
ArrayError => "Array error",
sqlx::Error => transparent, // believed to be self-explanatory
mysql_async::Error => "MySQL error",
tokio_postgres::Error => "Postgres error",
apache_avro::Error => "Avro error",
rdkafka::error::KafkaError => "Kafka error",
pulsar::Error => "Pulsar error",

async_nats::jetstream::consumer::StreamError => "Nats error",
async_nats::jetstream::consumer::pull::MessagesError => "Nats error",
async_nats::jetstream::context::CreateStreamError => "Nats error",
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub use debezium::*;
use futures::{Future, TryFutureExt};
use futures_async_stream::try_stream;
pub use json_parser::*;
pub use parquet_parser::ParquetParser;
pub use protobuf::*;
use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
use risingwave_common::bail;
Expand Down Expand Up @@ -76,6 +77,7 @@ mod debezium;
mod json_parser;
mod maxwell;
mod mysql;
pub mod parquet_parser;
pub mod plain_parser;
mod postgres;

Expand Down Expand Up @@ -1117,6 +1119,7 @@ pub enum EncodingProperties {
Json(JsonProperties),
MongoJson,
Bytes(BytesProperties),
Parquet,
Native,
/// Encoding can't be specified because the source will determines it. Now only used in Iceberg.
None,
Expand Down Expand Up @@ -1170,6 +1173,7 @@ impl SpecificParserConfig {
delimiter: info.csv_delimiter as u8,
has_header: info.csv_has_header,
}),
(SourceFormat::Plain, SourceEncode::Parquet) => EncodingProperties::Parquet,
(SourceFormat::Plain, SourceEncode::Avro)
| (SourceFormat::Upsert, SourceEncode::Avro) => {
let mut config = AvroProperties {
Expand Down
Loading

0 comments on commit 102a60d

Please sign in to comment.