Skip to content

Commit

Permalink
support precompute partition
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jan 22, 2024
1 parent 732e582 commit bc926bd
Show file tree
Hide file tree
Showing 17 changed files with 489 additions and 136 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "32c0bbf242f5c47b1e743f10577012fe7436c770", features = [
icelake = { git = "https://github.com/icelake-io/icelake", rev = "3f61f900d6914d4a28c00c2af6374a4dda6d95d4", features = [
"prometheus",
] }
arrow-array = "49"
Expand Down
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ message SinkDesc {
string sink_from_name = 12;
catalog.SinkFormatDesc format_desc = 13;
optional uint32 target_table = 14;
optional uint64 extra_partition_col_idx = 15;
}

enum SinkLogStoreType {
Expand Down
9 changes: 8 additions & 1 deletion src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ macro_rules! converts_generic {
fn try_from(array: &ArrayImpl) -> Result<Self, Self::Error> {
match array {
$($ArrayImplPattern(a) => Ok(Arc::new(<$ArrowType>::try_from(a)?)),)*
ArrayImpl::Timestamptz(a) => Ok(Arc::new(arrow_array::TimestampMicrosecondArray::try_from(a)?. with_timezone_utc())),
_ => todo!("unsupported array"),
}
}
Expand All @@ -152,6 +153,13 @@ macro_rules! converts_generic {
.unwrap()
.try_into()?,
)),)*
Timestamp(Microsecond, Some(_)) => Ok(ArrayImpl::Timestamptz(
array
.as_any()
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.unwrap()
.try_into()?,
)),
t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))),
}
}
Expand All @@ -173,7 +181,6 @@ converts_generic! {
{ arrow_array::Decimal256Array, Decimal256(_, _), ArrayImpl::Int256 },
{ arrow_array::Date32Array, Date32, ArrayImpl::Date },
{ arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, None), ArrayImpl::Timestamp },
{ arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, Some(_)), ArrayImpl::Timestamptz },
{ arrow_array::Time64MicrosecondArray, Time64(Microsecond), ArrayImpl::Time },
{ arrow_array::IntervalMonthDayNanoArray, Interval(MonthDayNano), ArrayImpl::Interval },
{ arrow_array::StructArray, Struct(_), ArrayImpl::Struct },
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "d0846a16c
"xz",
] }
arrow-array = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
async-nats = "0.33"
async-trait = "0.1"
auto_enums = { version = "0.8", features = ["futures03"] }
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub struct SinkDesc {

/// Id of the target table for sink into table.
pub target_table: Option<TableId>,

/// Indicate whether the sink accepts the data chunk with extra partition column.
/// For more detil of partition column, see `PartitionComputeInfo`
pub extra_partition_col_idx: Option<usize>,
}

impl SinkDesc {
Expand Down Expand Up @@ -123,6 +127,7 @@ impl SinkDesc {
db_name: self.db_name.clone(),
sink_from_name: self.sink_from_name.clone(),
target_table: self.target_table.map(|table_id| table_id.table_id()),
extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64),
}
}
}
Loading

0 comments on commit bc926bd

Please sign in to comment.