Skip to content

Commit

Permalink
Merge branch 'main' into message-filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
Nekit2217 authored Aug 18, 2024
2 parents 2c2fbea + 4d2fcbd commit 1fcc6c7
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
toolchain: stable
override: true
- name: Teststack setup
run: docker-compose up setup
run: docker compose up setup
- name: Run s3 feature tests
run: cargo test --features s3
- name: Run azure feature tests
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
url = "2.3"

# datafusion feature is required for writer version 2
deltalake-core = { version = "~0.18.1", features = ["json", "datafusion"]}
deltalake-aws = { version = "~0.1.2", optional = true }
deltalake-azure = { version = "~0.1.3", optional = true }
deltalake-core = { version = "~0.19.0", features = ["json", "datafusion"]}
deltalake-aws = { version = "~0.1.4", optional = true }
deltalake-azure = { version = "~0.1.4", optional = true }

# s3 feature enabled, helps for locking interactions with DLQ
dynamodb_lock = { version = "0.6.0", optional = true }
Expand Down
1 change: 1 addition & 0 deletions src/dead_letters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl DeadLetterQueue for NoopDeadLetterQueue {
/// Implementation of the [DeadLetterQueue] trait that writes dead letter content as warn logs.
/// This implementation is currently only intended for debug development usage.
/// Be mindful of your PII when using this implementation.
#[allow(dead_code)]
pub(crate) struct LoggingDeadLetterQueue {}

#[async_trait]
Expand Down
23 changes: 23 additions & 0 deletions src/transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ lazy_static! {
"epoch_seconds_to_iso8601",
Box::new(create_epoch_seconds_to_iso8601_fn()),
);
runtime.register_function(
"epoch_millis_to_iso8601",
Box::new(create_epoch_millis_to_iso8601_fn()),
);
runtime.register_function(
"epoch_micros_to_iso8601",
Box::new(create_epoch_micros_to_iso8601_fn()),
Expand Down Expand Up @@ -184,6 +188,13 @@ fn create_epoch_seconds_to_iso8601_fn() -> CustomFunction {
}

// TODO: Consolidate these custom function factories
fn create_epoch_millis_to_iso8601_fn() -> CustomFunction {
CustomFunction::new(
Signature::new(vec![ArgumentType::Number], None),
Box::new(jmespath_epoch_millis_to_iso8601),
)
}

fn create_epoch_micros_to_iso8601_fn() -> CustomFunction {
CustomFunction::new(
Signature::new(vec![ArgumentType::Number], None),
Expand Down Expand Up @@ -214,12 +225,14 @@ fn substr(args: &[Rcvar], context: &mut Context) -> Result<Rcvar, JmespathError>

enum EpochUnit {
Seconds(i64),
Milliseconds(i64),
Microseconds(i64),
}

fn iso8601_from_epoch(epoch_unit: EpochUnit) -> String {
let dt = match epoch_unit {
EpochUnit::Seconds(s) => Utc.timestamp_nanos(s * 1_000_000_000),
EpochUnit::Milliseconds(ms) => Utc.timestamp_nanos(ms * 1_000_000),
EpochUnit::Microseconds(u) => Utc.timestamp_nanos(u * 1000),
};

Expand All @@ -236,6 +249,16 @@ fn jmespath_epoch_seconds_to_iso8601(
Ok(Arc::new(variable))
}

fn jmespath_epoch_millis_to_iso8601(
args: &[Rcvar],
context: &mut Context,
) -> Result<Rcvar, JmespathError> {
let millis = i64_from_args(args, context, 0)?;
let value = serde_json::Value::String(iso8601_from_epoch(EpochUnit::Milliseconds(millis)));
let variable = Variable::try_from(value)?;
Ok(Arc::new(variable))
}

fn jmespath_epoch_micros_to_iso8601(
args: &[Rcvar],
context: &mut Context,
Expand Down

0 comments on commit 1fcc6c7

Please sign in to comment.