Skip to content

Commit

Permalink
support partition compute for sink
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jan 7, 2024
1 parent c8cdb9f commit 76b6931
Show file tree
Hide file tree
Showing 30 changed files with 1,333 additions and 103 deletions.
15 changes: 11 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [
#icelake = { git = "https://github.com/ZENOTME/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [
# "prometheus",
#] }
icelake = { git = "https://github.com/ZENOTME/icelake", branch = "transform", features = [
"prometheus",
] }
arrow-array = "49"
Expand Down
9 changes: 5 additions & 4 deletions e2e_test/iceberg/start_spark_connect_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ PACKAGES="$PACKAGES,org.apache.spark:spark-connect_2.12:$SPARK_VERSION"

SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"


wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE
tar -xzf $SPARK_FILE --no-same-owner
if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop3" ];then
wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE
tar -xzf $SPARK_FILE --no-same-owner
fi

./spark-${SPARK_VERSION}-bin-hadoop3/sbin/start-connect-server.sh --packages $PACKAGES \
--master local[3] \
Expand All @@ -29,4 +30,4 @@ while ! nc -z localhost 15002; do
sleep 1 # wait for 1/10 of the second before check again
done

echo "Spark connect server launched"
echo "Spark connect server launched"
12 changes: 11 additions & 1 deletion proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ message ExprNode {
JSONB_PATH_QUERY_ARRAY = 622;
JSONB_PATH_QUERY_FIRST = 623;

CONSTRUCT_STRUCT = 624;

// Non-pure functions below (> 1000)
// ------------------------
// Internal functions
Expand All @@ -274,6 +276,14 @@ message ExprNode {
// System information functions
PG_GET_INDEXDEF = 2400;
COL_DESCRIPTION = 2401;

// EXTERNAL
ICEBERG_BUCKET = 2201;
ICEBERG_TRUNCATE = 2202;
ICEBERG_YEAR = 2203;
ICEBERG_MONTH = 2204;
ICEBERG_DAY = 2205;
ICEBERG_HOUR = 2206;
}
Type function_type = 1;
data.DataType return_type = 3;
Expand All @@ -283,7 +293,7 @@ message ExprNode {
FunctionCall func_call = 6;
UserDefinedFunction udf = 7;
NowRexNode now = 8;
}
}
}

message TableFunction {
Expand Down
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ message SinkDesc {
string sink_from_name = 12;
catalog.SinkFormatDesc format_desc = 13;
optional uint32 target_table = 14;
bool with_extra_partition_col = 15;
}

enum SinkLogStoreType {
Expand Down
9 changes: 8 additions & 1 deletion src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ macro_rules! converts_generic {
fn try_from(array: &ArrayImpl) -> Result<Self, Self::Error> {
match array {
$($ArrayImplPattern(a) => Ok(Arc::new(<$ArrowType>::try_from(a)?)),)*
ArrayImpl::Timestamptz(a) => Ok(Arc::new(arrow_array::TimestampMicrosecondArray::try_from(a)?. with_timezone_utc())),
_ => todo!("unsupported array"),
}
}
Expand All @@ -152,6 +153,13 @@ macro_rules! converts_generic {
.unwrap()
.try_into()?,
)),)*
Timestamp(Microsecond, Some(_)) => Ok(ArrayImpl::Timestamptz(
array
.as_any()
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.unwrap()
.try_into()?,
)),
t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))),
}
}
Expand All @@ -173,7 +181,6 @@ converts_generic! {
{ arrow_array::Decimal256Array, Decimal256(_, _), ArrayImpl::Int256 },
{ arrow_array::Date32Array, Date32, ArrayImpl::Date },
{ arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, None), ArrayImpl::Timestamp },
{ arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, Some(_)), ArrayImpl::Timestamptz },
{ arrow_array::Time64MicrosecondArray, Time64(Microsecond), ArrayImpl::Time },
{ arrow_array::IntervalMonthDayNanoArray, Interval(MonthDayNano), ArrayImpl::Interval },
{ arrow_array::StructArray, Struct(_), ArrayImpl::Struct },
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "d0846a16c
"xz",
] }
arrow-array = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
async-nats = "0.33"
async-trait = "0.1"
auto_enums = { version = "0.8", features = ["futures03"] }
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub struct SinkDesc {

/// Id of the target table for sink into table.
pub target_table: Option<TableId>,

/// Indicate whether the sink accepts the data chunk with extra partition column.
/// For more detil of partition column, see `PartitionComputeInfo`
pub with_extra_partition_col: bool,
}

impl SinkDesc {
Expand Down Expand Up @@ -123,6 +127,7 @@ impl SinkDesc {
db_name: self.db_name.clone(),
sink_from_name: self.sink_from_name.clone(),
target_table: self.target_table.map(|table_id| table_id.table_id()),
with_extra_partition_col: self.with_extra_partition_col,
}
}
}
Loading

0 comments on commit 76b6931

Please sign in to comment.