Skip to content

Commit

Permalink
draft partition compute
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jan 3, 2024
1 parent bb66778 commit a7c11fe
Show file tree
Hide file tree
Showing 21 changed files with 971 additions and 49 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [
#icelake = { git = "https://github.com/ZENOTME/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [
# "prometheus",
#] }
icelake = { git = "https://github.com/ZENOTME/icelake", branch = "transform" , features = [
"prometheus",
] }
arrow-array = "49"
Expand Down
15 changes: 14 additions & 1 deletion proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,19 @@ message ExprNode {
// Adminitration functions
COL_DESCRIPTION = 2100;
CAST_REGCLASS = 2101;

CONSTRUCT_STRUCT = 2111;

// EXTERNAL
ICEBERG_PARTITION = 2102;
ICEBERG_BUCKET_I32 = 2103;
ICEBERG_BUCKET_I64 = 2104;
ICEBERG_BUCKET_DECIMAL = 2105;
ICEBERG_TRUNCATE_I32 = 2106;
ICEBERG_YEAR = 2107;
ICEBERG_MONTH = 2108;
ICEBERG_DAY = 2109;
ICEBERG_HOUR = 2110;
}
Type function_type = 1;
data.DataType return_type = 3;
Expand All @@ -280,7 +293,7 @@ message ExprNode {
FunctionCall func_call = 6;
UserDefinedFunction udf = 7;
NowRexNode now = 8;
}
}
}

message TableFunction {
Expand Down
9 changes: 8 additions & 1 deletion src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ macro_rules! converts_generic {
fn try_from(array: &ArrayImpl) -> Result<Self, Self::Error> {
match array {
$($ArrayImplPattern(a) => Ok(Arc::new(<$ArrowType>::try_from(a)?)),)*
ArrayImpl::Timestamptz(a) => Ok(Arc::new(arrow_array::TimestampMicrosecondArray::try_from(a)?. with_timezone_utc())),
_ => todo!("unsupported array"),
}
}
Expand All @@ -152,6 +153,13 @@ macro_rules! converts_generic {
.unwrap()
.try_into()?,
)),)*
Timestamp(Microsecond, Some(_)) => Ok(ArrayImpl::Timestamptz(
array
.as_any()
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.unwrap()
.try_into()?,
)),
t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))),
}
}
Expand All @@ -173,7 +181,6 @@ converts_generic! {
{ arrow_array::Decimal256Array, Decimal256(_, _), ArrayImpl::Int256 },
{ arrow_array::Date32Array, Date32, ArrayImpl::Date },
{ arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, None), ArrayImpl::Timestamp },
{ arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, Some(_)), ArrayImpl::Timestamptz },
{ arrow_array::Time64MicrosecondArray, Time64(Microsecond), ArrayImpl::Time },
{ arrow_array::IntervalMonthDayNanoArray, Interval(MonthDayNano), ArrayImpl::Interval },
{ arrow_array::StructArray, Struct(_), ArrayImpl::Struct },
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "d0846a16c
"xz",
] }
arrow-array = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
async-nats = "0.32"
async-trait = "0.1"
auto_enums = { version = "0.8", features = ["futures03"] }
Expand Down
64 changes: 51 additions & 13 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod precomputed_partition_writer;
mod prometheus;

use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;

use anyhow::anyhow;
use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef,
};
use async_trait::async_trait;
use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE};
use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter};
Expand All @@ -44,7 +47,6 @@ use url::Url;
use with_options::WithOptions;

use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder;
use self::prometheus::monitored_partition_writer::MonitoredFanoutPartitionedWriterBuilder;
use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder;
use super::{
Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Expand Down Expand Up @@ -163,7 +165,7 @@ impl IcebergConfig {
Ok(config)
}

fn build_iceberg_configs(&self) -> Result<HashMap<String, String>> {
pub fn build_iceberg_configs(&self) -> Result<HashMap<String, String>> {
let mut iceberg_configs = HashMap::new();

let catalog_type = self
Expand Down Expand Up @@ -391,6 +393,40 @@ enum IcebergWriterEnum {
}

impl IcebergWriter {
fn schema(table: &Table) -> Result<SchemaRef> {
let schema = table.current_arrow_schema()?;
// #TODO
// When iceberg table is partitioned, we will add a partition column to the schema.
// Maybe we should accept a flag from `SinkWriterParam`?
if !table
.current_table_metadata()
.current_partition_spec()?
.is_unpartitioned()
{
let mut fields = schema.fields().to_vec();
// # TODO
// When convert rw struct to arrow struct, we will compare the type of each field.
// To avoid the impact of metadata, we should reconstruct struct type without metadata.
// This approach may hack, do we have a better way?
let partition_type = if let ArrowDataType::Struct(s) =
table.current_partition_type()?.try_into()?
{
let fields = Fields::from(
s.into_iter()
.map(|field| ArrowField::new(field.name(), field.data_type().clone(), true))
.collect::<Vec<_>>(),
);
ArrowDataType::Struct(fields)
} else {
unimplemented!()
};
fields.push(ArrowField::new("_rw_partition", partition_type, false).into());
Ok(ArrowSchema::new(fields).into())
} else {
Ok(schema)
}
}

pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
let builder_helper = table.builder_helper()?;

Expand All @@ -402,10 +438,11 @@ impl IcebergWriter {
.iceberg_rolling_unflushed_data_file
.clone(),
));
let partition_data_file_builder = MonitoredFanoutPartitionedWriterBuilder::new(
builder_helper.fanout_partition_writer_builder(data_file_builder.clone())?,
writer_param.sink_metrics.iceberg_partition_num.clone(),
);
let partition_data_file_builder =
precomputed_partition_writer::PrecomputedPartitionedWriterBuilder::new(
data_file_builder.clone(),
table.current_partition_type()?,
);
let dispatch_builder = builder_helper
.dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?;
// wrap a layer with collect write metrics
Expand All @@ -420,7 +457,7 @@ impl IcebergWriter {
.clone(),
),
);
let schema = table.current_arrow_schema()?;
let schema = Self::schema(&table)?;
let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?);
Ok(Self {
inner_writer: IcebergWriterEnum::AppendOnly(inner_writer),
Expand Down Expand Up @@ -457,10 +494,11 @@ impl IcebergWriter {
equality_delete_builder,
unique_column_ids,
);
let partition_delta_builder = MonitoredFanoutPartitionedWriterBuilder::new(
builder_helper.fanout_partition_writer_builder(delta_builder.clone())?,
writer_param.sink_metrics.iceberg_partition_num.clone(),
);
let partition_delta_builder =
precomputed_partition_writer::PrecomputedPartitionedWriterBuilder::new(
delta_builder.clone(),
table.current_partition_type()?,
);
let dispatch_builder =
builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?;
// wrap a layer with collect write metrics
Expand All @@ -475,7 +513,7 @@ impl IcebergWriter {
.clone(),
),
);
let schema = table.current_arrow_schema()?;
let schema = Self::schema(&table)?;
let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?);
Ok(Self {
inner_writer: IcebergWriterEnum::Upsert(inner_writer),
Expand Down
Loading

0 comments on commit a7c11fe

Please sign in to comment.