From 5a1c05e36764ad5f261c0eb324d14c0c1e648a18 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 6 Aug 2024 15:30:38 +0000 Subject: [PATCH 1/4] Allow the LoggingDeadLetterQueue to pass the dead code lint --- src/dead_letters.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dead_letters.rs b/src/dead_letters.rs index d1acec7..9feb0f5 100644 --- a/src/dead_letters.rs +++ b/src/dead_letters.rs @@ -201,6 +201,7 @@ impl DeadLetterQueue for NoopDeadLetterQueue { /// Implementation of the [DeadLetterQueue] trait that writes dead letter content as warn logs. /// This implementation is currently only intended for debug development usage. /// Be mindful of your PII when using this implementation. +#[allow(dead_code)] pub(crate) struct LoggingDeadLetterQueue {} #[async_trait] From 902d438e8089b7a9442af85e23b536573e13720e Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 10 Aug 2024 14:22:45 +0000 Subject: [PATCH 2/4] new docker who dis --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 254b735..064dc0c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -52,7 +52,7 @@ jobs: toolchain: stable override: true - name: Teststack setup - run: docker-compose up setup + run: docker compose up setup - name: Run s3 feature tests run: cargo test --features s3 - name: Run azure feature tests From 81146666272288604526db75cf0b94df4c5c7875 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 16 Aug 2024 16:10:49 +0000 Subject: [PATCH 3/4] chore: pin the base version of delta-rs to 0.19.0 which has loads of improvements The performance improvements alone in this release are a a big deal for kafka-delta-ingest --- Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c1015a6..2e79863 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,9 +34,9 @@ uuid = { version = "0.8", features = ["serde", "v4"] } url = "2.3" # datafusion feature is required for writer version 2 -deltalake-core = { version = "~0.18.1", features = ["json", "datafusion"]} -deltalake-aws = { version = "~0.1.2", optional = true } -deltalake-azure = { version = "~0.1.3", optional = true } +deltalake-core = { version = "~0.19.0", features = ["json", "datafusion"]} +deltalake-aws = { version = "~0.1.4", optional = true } +deltalake-azure = { version = "~0.1.4", optional = true } # s3 feature enabled, helps for locking interactions with DLQ dynamodb_lock = { version = "0.6.0", optional = true } From 4d2fcbd9f9d0f5c899d903156e3e30e6c05eefef Mon Sep 17 00:00:00 2001 From: Kosmas Papadatos Date: Sat, 27 Jul 2024 10:18:16 +0300 Subject: [PATCH 4/4] Add epoch_millis_to_iso8601 transform function --- src/transforms.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/transforms.rs b/src/transforms.rs index b4803b6..6531692 100644 --- a/src/transforms.rs +++ b/src/transforms.rs @@ -76,6 +76,10 @@ lazy_static! { "epoch_seconds_to_iso8601", Box::new(create_epoch_seconds_to_iso8601_fn()), ); + runtime.register_function( + "epoch_millis_to_iso8601", + Box::new(create_epoch_millis_to_iso8601_fn()), + ); runtime.register_function( "epoch_micros_to_iso8601", Box::new(create_epoch_micros_to_iso8601_fn()), @@ -184,6 +188,13 @@ fn create_epoch_seconds_to_iso8601_fn() -> CustomFunction { } // TODO: Consolidate these custom function factories +fn create_epoch_millis_to_iso8601_fn() -> CustomFunction { + CustomFunction::new( + Signature::new(vec![ArgumentType::Number], None), + Box::new(jmespath_epoch_millis_to_iso8601), + ) +} + fn create_epoch_micros_to_iso8601_fn() -> CustomFunction { CustomFunction::new( Signature::new(vec![ArgumentType::Number], None), @@ -214,12 +225,14 @@ fn substr(args: &[Rcvar], context: &mut Context) -> Result enum EpochUnit { Seconds(i64), + Milliseconds(i64), Microseconds(i64), } fn iso8601_from_epoch(epoch_unit: EpochUnit) -> String { let dt = match epoch_unit { EpochUnit::Seconds(s) => Utc.timestamp_nanos(s * 1_000_000_000), + EpochUnit::Milliseconds(ms) => Utc.timestamp_nanos(ms * 1_000_000), EpochUnit::Microseconds(u) => Utc.timestamp_nanos(u * 1000), }; @@ -236,6 +249,16 @@ fn jmespath_epoch_seconds_to_iso8601( Ok(Arc::new(variable)) } +fn jmespath_epoch_millis_to_iso8601( + args: &[Rcvar], + context: &mut Context, +) -> Result { + let millis = i64_from_args(args, context, 0)?; + let value = serde_json::Value::String(iso8601_from_epoch(EpochUnit::Milliseconds(millis))); + let variable = Variable::try_from(value)?; + Ok(Arc::new(variable)) +} + fn jmespath_epoch_micros_to_iso8601( args: &[Rcvar], context: &mut Context,