diff --git a/Cargo.lock b/Cargo.lock index 9851396d00dae..9bcc9fa697672 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,6 +233,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +[[package]] +name = "arrayref" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" + [[package]] name = "arrayvec" version = "0.7.4" @@ -302,6 +308,7 @@ dependencies = [ "arrow-data 48.0.1", "arrow-schema 48.0.1", "chrono", + "chrono-tz", "half 2.3.1", "hashbrown 0.14.0", "num", @@ -357,6 +364,7 @@ dependencies = [ "arrow-schema 48.0.1", "arrow-select 48.0.1", "chrono", + "comfy-table", "half 2.3.1", "lexical-core", "num", @@ -650,6 +658,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-compression" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5" +dependencies = [ + "bzip2", + "flate2", + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", + "tokio", + "xz2", + "zstd 0.13.0", + "zstd-safe 7.0.0", +] + [[package]] name = "async-executor" version = "1.5.1" @@ -1558,6 +1584,28 @@ dependencies = [ "triple_accel", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest 0.10.7", +] + +[[package]] +name = "blake3" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0231f06152bf547e9c2b5194f247cd97aacf6dcd8b15d8e5ec0663f64580da87" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", +] + [[package]] name = "block-buffer" version = "0.9.0" @@ -2158,6 +2206,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6" +[[package]] +name = "constant_time_eq" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" + [[package]] name = "convert_case" version = "0.6.0" @@ -2712,6 +2766,221 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41b319d1b62ffbd002e057f36bebd1f42b9f97927c9577461d855f3513c4289f" +[[package]] +name = "datafusion" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "676796427e638d85e9eadf13765705212be60b34f8fc5d3934d95184c63ca1b4" +dependencies = [ + "ahash 0.8.3", + "arrow", + "arrow-array 48.0.1", + "arrow-schema 48.0.1", + "async-compression", + "async-trait", + "bytes", + "bzip2", + "chrono", + "dashmap", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-optimizer", + "datafusion-physical-expr", + "datafusion-physical-plan", + "datafusion-sql", + "flate2", + "futures", + "glob", + "half 2.3.1", + "hashbrown 0.14.0", + "indexmap 2.0.0", + "itertools 0.11.0", + "log", + "num_cpus", + "object_store", + "parking_lot 0.12.1", + "parquet 48.0.1", + "pin-project-lite", + "rand", + "sqlparser", + "tempfile", + "tokio", + "tokio-util", + "url", + "uuid", + "xz2", + "zstd 0.13.0", +] + +[[package]] +name = "datafusion-common" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e23b3d21a6531259d291bd20ce59282ea794bda1018b0a1e278c13cd52e50c" +dependencies = [ + "ahash 0.8.3", + "arrow", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-schema 48.0.1", + "chrono", + "half 2.3.1", + "num_cpus", + "object_store", + "parquet 48.0.1", + "sqlparser", +] + +[[package]] +name = "datafusion-execution" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4de1fd0d8db0f2b8e4f4121bfa1c7c09d3a5c08a0a65c2229cd849eb65cff855" +dependencies = [ + "arrow", + "chrono", + "dashmap", + "datafusion-common", + "datafusion-expr", + "futures", + "hashbrown 0.14.0", + "log", + "object_store", + "parking_lot 0.12.1", + "rand", + "tempfile", + "url", +] + +[[package]] +name = "datafusion-expr" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e227fe88bf6730cab378d0cd8fc4c6b2ea42bc7e414a8ea9feba7225932735" +dependencies = [ + "ahash 0.8.3", + "arrow", + "arrow-array 48.0.1", + "datafusion-common", + "sqlparser", + "strum", + "strum_macros", +] + +[[package]] +name = "datafusion-optimizer" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6648e62ea7605b9bfcd87fdc9d67e579c3b9ac563a87734ae5fe6d79ee4547" +dependencies = [ + "arrow", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr", + "hashbrown 0.14.0", + "itertools 0.11.0", + "log", + "regex-syntax 0.8.0", +] + +[[package]] +name = "datafusion-physical-expr" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f32b8574add16a32411a9b3fb3844ac1fc09ab4e7be289f86fd56d620e4f2508" +dependencies = [ + "ahash 0.8.3", + "arrow", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-ord 48.0.1", + "arrow-schema 48.0.1", + "base64 0.21.4", + "blake2", + "blake3", + "chrono", + "datafusion-common", + "datafusion-expr", + "half 2.3.1", + "hashbrown 0.14.0", + "hex", + "indexmap 2.0.0", + "itertools 0.11.0", + "libc", + "log", + "md-5 0.10.5", + "paste", + "petgraph", + "rand", + "regex", + "sha2 0.10.7", + "unicode-segmentation", + "uuid", +] + +[[package]] +name = "datafusion-physical-plan" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "796abd77d5bfecd9e5275a99daf0ec45f5b3a793ec431349ce8211a67826fd22" +dependencies = [ + "ahash 0.8.3", + "arrow", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-schema 48.0.1", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "futures", + "half 2.3.1", + "hashbrown 0.14.0", + "indexmap 2.0.0", + "itertools 0.11.0", + "log", + "once_cell", + "parking_lot 0.12.1", + "pin-project-lite", + "rand", + "tokio", + "uuid", +] + +[[package]] +name = "datafusion-proto" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26de2592417beb20f73f29b131a04d7de14e2a6336c631554d611584b4306236" +dependencies = [ + "arrow", + "chrono", + "datafusion", + "datafusion-common", + "datafusion-expr", + "object_store", + "prost 0.12.1", +] + +[[package]] +name = "datafusion-sql" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced70b8a5648ba7b95c61fc512183c33287ffe2c9f22ffe22700619d7d48c84f" +dependencies = [ + "arrow", + "arrow-schema 48.0.1", + "datafusion-common", + "datafusion-expr", + "log", + "sqlparser", +] + [[package]] name = "debugid" version = "0.8.0" @@ -2746,6 +3015,13 @@ dependencies = [ "bytes", "cfg-if", "chrono", + "dashmap", + "datafusion", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr", + "datafusion-proto", + "datafusion-sql", "dynamodb_lock", "either", "errno", @@ -2772,6 +3048,7 @@ dependencies = [ "rusoto_sts", "serde", "serde_json", + "sqlparser", "thiserror", "tokio", "url", @@ -8201,6 +8478,7 @@ dependencies = [ "clickhouse", "criterion", "csv", + "datafusion", "deltalake", "duration-str", "easy-ext", @@ -10497,6 +10775,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "sqlparser" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "743b4dc2cbde11890ccb254a8fc9d537fa41b36da00de2a1c5e9848c9bc42bd7" +dependencies = [ + "log", + "sqlparser_derive", +] + +[[package]] +name = "sqlparser_derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "sqlx" version = "0.7.1" @@ -12226,6 +12525,8 @@ dependencies = [ "ahash 0.8.3", "allocator-api2", "anyhow", + "arrow-array 48.0.1", + "arrow-cast 48.0.1", "arrow-schema 48.0.1", "async-std", "auto_enums", @@ -12242,9 +12543,12 @@ dependencies = [ "bytes", "cc", "chrono", + "chrono-tz", + "chrono-tz-build", "clap", "clap_builder", "combine", + "comfy-table", "crossbeam-epoch", "crossbeam-utils", "crypto-bigint 0.5.5", diff --git a/Cargo.toml b/Cargo.toml index a9c4c618d4078..97c5318411f0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -133,7 +133,7 @@ arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" } arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" } arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" } arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" } -deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "72505449e9538371fe5fda35d545dbd662facd07", features = ["s3"] } +deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "72505449e9538371fe5fda35d545dbd662facd07", features = ["s3","datafusion"] } parquet = "49" thiserror-ext = "0.0.10" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } diff --git a/ci/scripts/e2e-deltalake-sink-rust-test.sh b/ci/scripts/e2e-deltalake-sink-rust-test.sh old mode 100644 new mode 100755 index 84feb10248a5e..e5829ac46a11d --- a/ci/scripts/e2e-deltalake-sink-rust-test.sh +++ b/ci/scripts/e2e-deltalake-sink-rust-test.sh @@ -57,18 +57,18 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/deltalake_rust_sink.slt' sleep 1 -spark-3.3.1-bin-hadoop3/bin/spark-sql --packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2 \ +spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \ --conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \ --conf 'spark.hadoop.fs.s3a.access.key=hummockadmin' \ --conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \ --conf 'spark.hadoop.fs.s3a.endpoint=http://localhost:9301' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ - --S --e 'INSERT OVERWRITE DIRECTORY './spark-output' USING CSV SELECT * FROM delta.`s3a://deltalake/deltalake-test`;' + --S --e 'INSERT OVERWRITE DIRECTORY "./spark-output" USING CSV SELECT * FROM delta.`s3a://deltalake/deltalake-test`;' # check sink destination using shell if cat ./spark-output/*.csv | sort | awk -F "," '{ - exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01T09:01:01.000+08:00" && $9 == false); }'; then + exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01T01:01:01.000+08:00" && $9 == "false"); }'; then echo "DeltaLake sink check passed" else echo "The output is not as expected." diff --git a/e2e_test/sink/deltalake_rust_sink.slt b/e2e_test/sink/deltalake_rust_sink.slt index 2c53a1c604ab2..2b44f7fb38ac6 100644 --- a/e2e_test/sink/deltalake_rust_sink.slt +++ b/e2e_test/sink/deltalake_rust_sink.slt @@ -10,7 +10,7 @@ with ( connector = 'deltalake_rust', type = 'append-only', force_append_only = 'true', - path = 's3a://deltalake/deltalake-test', + location = 's3a://deltalake/deltalake-test', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', s3.endpoint = 'http://127.0.0.1:9301', diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 3d69ed4302321..0fede0984b8e3 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -46,6 +46,7 @@ clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "6 "time", ] } csv = "1.3" +datafusion = "33.0.0" deltalake = { workspace = true } duration-str = "0.7.0" easy-ext = "1" diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index c173ddf938e04..8b6a6c604dfc2 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -54,8 +54,8 @@ pub struct DeltaLakeCommon { pub s3_access_key: Option, #[serde(rename = "s3.secret.key")] pub s3_secret_key: Option, - #[serde(rename = "path")] - pub path: String, + #[serde(rename = "location")] + pub location: String, #[serde(rename = "s3.region")] pub s3_region: Option, #[serde(rename = "s3.endpoint")] @@ -63,7 +63,7 @@ pub struct DeltaLakeCommon { } impl DeltaLakeCommon { pub async fn create_deltalake_client(&self) -> Result { - let table = match Self::get_table_url(&self.path)? { + let table = match Self::get_table_url(&self.location)? { DeltaTableUrl::S3(s3_path) => { let mut storage_options = HashMap::new(); storage_options.insert( @@ -416,15 +416,8 @@ impl SinkWriter for DeltaLakeSinkWriter { pub struct DeltaLakeSinkCommitter { table: DeltaTable, } - -#[async_trait::async_trait] -impl SinkCommitCoordinator for DeltaLakeSinkCommitter { - async fn init(&mut self) -> Result<()> { - tracing::info!("DeltaLake commit coordinator inited."); - Ok(()) - } - - async fn commit(&mut self, epoch: u64, metadata: Vec) -> Result<()> { +impl DeltaLakeSinkCommitter { + pub async fn commit_inner(&mut self, epoch: u64, metadata: Vec) -> Result<()> { tracing::info!("Starting DeltaLake commit in epoch {epoch}."); let deltalake_write_result = metadata @@ -467,6 +460,18 @@ impl SinkCommitCoordinator for DeltaLakeSinkCommitter { } } +#[async_trait::async_trait] +impl SinkCommitCoordinator for DeltaLakeSinkCommitter { + async fn init(&mut self) -> Result<()> { + tracing::info!("DeltaLake commit coordinator inited."); + Ok(()) + } + + async fn commit(&mut self, epoch: u64, metadata: Vec) -> Result<()> { + self.commit_inner(epoch, metadata).await + } +} + #[derive(Serialize, Deserialize)] struct DeltaLakeWriteResult { adds: Vec, @@ -498,3 +503,99 @@ impl DeltaLakeWriteResult { } } } + +#[cfg(test)] +mod test { + use std::fs; + use std::sync::Arc; + + use datafusion::prelude::*; + use deltalake::kernel::DataType as SchemaDataType; + use deltalake::operations::create::CreateBuilder; + use maplit::hashmap; + use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array}; + use risingwave_common::catalog::{Field, Schema}; + + use super::{DeltaLakeConfig, DeltaLakeSinkWriter}; + use crate::sink::deltalake::DeltaLakeSinkCommitter; + use crate::sink::writer::SinkWriter; + use crate::source::DataType; + + fn remove_dir(path: &str) { + if fs::metadata(path).is_ok() && fs::metadata(path).unwrap().is_dir() { + fs::remove_dir_all(path).unwrap(); + } + } + #[tokio::test] + async fn test_deltalake() { + let path = "./deltalake-test"; + remove_dir(path); + CreateBuilder::new() + .with_location(path) + .with_column("id", SchemaDataType::integer(), false, Default::default()) + .with_column("name", SchemaDataType::string(), false, Default::default()) + .await + .unwrap(); + + let properties = hashmap! { + "connector".to_string() => "deltalake_rust".to_string(), + "force_append_only".to_string() => "true".to_string(), + "type".to_string() => "append-only".to_string(), + "location".to_string() => format!("file://{}",path), + }; + + let schema = Schema::new(vec![ + Field { + data_type: DataType::Int32, + name: "id".into(), + sub_fields: vec![], + type_name: "".into(), + }, + Field { + data_type: DataType::Varchar, + name: "name".into(), + sub_fields: vec![], + type_name: "".into(), + }, + ]); + + let deltalake_config = DeltaLakeConfig::from_hashmap(properties).unwrap(); + let deltalake_table = deltalake_config + .common + .create_deltalake_client() + .await + .unwrap(); + + let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0]) + .await + .unwrap(); + let chunk = StreamChunk::new( + vec![Op::Insert, Op::Insert, Op::Insert], + vec![ + I32Array::from_iter(vec![1, 2, 3]).into_ref(), + Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(), + ], + ); + deltalake_writer.write(chunk).await.unwrap(); + let mut committer = DeltaLakeSinkCommitter { + table: deltalake_table, + }; + let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap(); + committer.commit_inner(1, vec![metadata]).await.unwrap(); + + let ctx = SessionContext::new(); + let table = deltalake::open_table(path).await.unwrap(); + ctx.register_table("demo", Arc::new(table)).unwrap(); + + let batches = ctx + .sql("SELECT * FROM demo") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(3, batches.get(0).unwrap().column(0).len()); + assert_eq!(3, batches.get(0).unwrap().column(1).len()); + remove_dir(path); + } +} diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 17c553b2f3e40..4770fd621ad3c 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -74,7 +74,7 @@ DeltaLakeConfig: - name: s3.secret.key field_type: String required: false - - name: path + - name: location field_type: String required: true - name: s3.region diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index bd362d308c45b..83f6711d14274 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -21,6 +21,8 @@ publish = false ahash = { version = "0.8" } allocator-api2 = { version = "0.2", default-features = false, features = ["alloc", "nightly"] } anyhow = { version = "1", features = ["backtrace"] } +arrow-array = { version = "48", default-features = false, features = ["chrono-tz"] } +arrow-cast = { version = "48", default-features = false, features = ["prettyprint"] } arrow-schema = { version = "48", default-features = false, features = ["serde"] } async-std = { version = "1", features = ["attributes", "tokio1"] } aws-credential-types = { version = "1", default-features = false, features = ["hardcoded-credentials"] } @@ -35,9 +37,11 @@ bitflags = { version = "2", default-features = false, features = ["serde", "std" byteorder = { version = "1" } bytes = { version = "1", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } +chrono-tz = { version = "0.8", features = ["case-insensitive"] } clap = { version = "4", features = ["cargo", "derive", "env"] } clap_builder = { version = "4", default-features = false, features = ["cargo", "color", "env", "help", "std", "suggestions", "usage"] } combine = { version = "4", features = ["tokio"] } +comfy-table = { version = "7" } crossbeam-epoch = { version = "0.9" } crossbeam-utils = { version = "0.8" } crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] } @@ -155,6 +159,7 @@ auto_enums = { version = "0.8", features = ["futures03", "tokio1"] } bitflags = { version = "2", default-features = false, features = ["serde", "std"] } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } +chrono-tz-build = { version = "0.2", default-features = false, features = ["case-insensitive"] } deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] } either = { version = "1", features = ["serde"] } fixedbitset = { version = "0.4" }