Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sink): construct java.sql.time/date object with a long type #13651

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriterBase;
import io.grpc.Status;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -199,15 +200,29 @@ private Map<String, Object> buildDoc(SinkRow row)
var type = columnDescs.get(i).getDataType().getTypeName();
Object col = row.get(i);
switch (type) {
case DATE:
case TIME:
case TIMESTAMP:
case TIMESTAMPTZ:
// es client doesn't natively support java.sql.Timestamp/Time/Date
// so we need to convert Date/Time/Timestamp type into a string as suggested in
// https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292
case DATE:
col = col.toString();
break;
// construct java.sql.Time/Timestamp with milliseconds time value.
// it will use system timezone by default, so we have to set timezone manually
case TIME:
SimpleDateFormat tDfm = new SimpleDateFormat("HH:mm:ss.SSS");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The formatter can be stored as a static field to avoid repeatedly creating the objects.

Copy link
Contributor Author

@xuefengze xuefengze Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimpleDateFormatter aren’t thread-safe. Using a static formatter seems to lead to incorrect data and test failure. https://buildkite.com/risingwavelabs/pull-request/builds/36192#018c1499-1f24-4950-a41a-d703aaaec4e2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's one of the many reasons to prefer java.time.* But yes it may need a prior refactor for other remote sinks as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimpleDateFormatter aren’t thread-safe. Using a static formatter seems to lead to incorrect data and test failure. https://buildkite.com/risingwavelabs/pull-request/builds/36192#018c1499-1f24-4950-a41a-d703aaaec4e2

If so we can store them as a ordinary field instead of a static field and create an instance for each sink writer, since within a single sink writer it's single threaded.

tDfm.setTimeZone(TimeZone.getTimeZone("UTC"));
col = tDfm.format(col);
break;
case TIMESTAMP:
SimpleDateFormat tsDfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
tsDfm.setTimeZone(TimeZone.getTimeZone("UTC"));
col = tsDfm.format(col);
break;
case TIMESTAMPTZ:
SimpleDateFormat tszDfm = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
tszDfm.setTimeZone(TimeZone.getTimeZone("UTC"));
col = tszDfm.format(col);
break;
case JSONB:
ObjectMapper mapper = new ObjectMapper();
col =
Expand Down
1 change: 1 addition & 0 deletions src/jni_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ normal = ["workspace-hack"]
anyhow = "1"
bytes = "1"
cfg-or-panic = "0.2"
chrono = { version = "0.4", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.12"
jni = "0.21.1"
Expand Down
86 changes: 24 additions & 62 deletions src/jni_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ use std::sync::{LazyLock, OnceLock};
use anyhow::anyhow;
use bytes::Bytes;
use cfg_or_panic::cfg_or_panic;
use chrono::NaiveDateTime;
use jni::objects::{
AutoElements, GlobalRef, JByteArray, JClass, JMethodID, JObject, JStaticMethodID, JString,
JValueGen, JValueOwned, ReleaseMode,
AutoElements, GlobalRef, JByteArray, JClass, JMethodID, JObject, JString, ReleaseMode,
};
use jni::signature::ReturnType;
use jni::sys::{
jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort, jsize, jvalue, JNI_FALSE, JNI_TRUE,
};
Expand Down Expand Up @@ -234,8 +233,8 @@ struct JavaClassMethodCache {
big_decimal_ctor: OnceLock<(GlobalRef, JMethodID)>,
timestamp_ctor: OnceLock<(GlobalRef, JMethodID)>,

date_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
time_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
date_ctor: OnceLock<(GlobalRef, JMethodID)>,
time_ctor: OnceLock<(GlobalRef, JMethodID)>,
}

// TODO: may only return a RowRef
Expand Down Expand Up @@ -675,40 +674,21 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDateValue
idx: jint,
) -> JObject<'a> {
execute_and_catch(env, move |env: &mut EnvParam<'_>| {
let value = pointer
.as_ref()
.datum_at(idx as usize)
.unwrap()
.into_date()
.0
.to_string();
// Constructs a Date object using a milliseconds time value.
let value = pointer.as_ref().datum_at(idx as usize).unwrap().into_date();
let datetime = value.0.and_hms_opt(0, 0, 0).unwrap();
let millis = datetime.timestamp_millis();

let string_value = env.new_string(value)?;
let (class_ref, constructor) =
let (date_class_ref, constructor) =
pointer.as_ref().class_cache.date_ctor.get_or_try_init(|| {
let cls = env.find_class("java/sql/Date")?;
let init_method = env.get_static_method_id(
&cls,
"valueOf",
"(Ljava/lang/String;)Ljava/sql/Date;",
)?;
let init_method = env.get_method_id(&cls, "<init>", "(J)V")?;
Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
})?;
unsafe {
let JValueOwned::Object(date_obj) = env.call_static_method_unchecked(
<&JClass<'_>>::from(class_ref.as_obj()),
*constructor,
ReturnType::Object,
&[jvalue {
l: string_value.into_raw(),
}],
)?
else {
return Err(BindingError::from(jni::errors::Error::MethodNotFound {
name: "valueOf".to_string(),
sig: "(Ljava/lang/String;)Ljava/sql/Date;".into(),
}));
};
let date_class = <&JClass<'_>>::from(date_class_ref.as_obj());
let date_obj =
env.new_object_unchecked(date_class, *constructor, &[jvalue { j: millis }])?;
Ok(date_obj)
}
})
Expand All @@ -721,41 +701,23 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimeValue
idx: jint,
) -> JObject<'a> {
execute_and_catch(env, move |env: &mut EnvParam<'_>| {
let value = pointer
.as_ref()
.datum_at(idx as usize)
.unwrap()
.into_time()
.0
.to_string();
// Constructs a Time object using a milliseconds time value.
let value = pointer.as_ref().datum_at(idx as usize).unwrap().into_time();
let epoch_date = NaiveDateTime::UNIX_EPOCH.date();
let datetime = epoch_date.and_time(value.0);
let millis = datetime.timestamp_millis();

let string_value = env.new_string(value)?;
let (class_ref, constructor) =
let (time_class_ref, constructor) =
pointer.as_ref().class_cache.time_ctor.get_or_try_init(|| {
let cls = env.find_class("java/sql/Time")?;
xuefengze marked this conversation as resolved.
Show resolved Hide resolved
let init_method = env.get_static_method_id(
&cls,
"valueOf",
"(Ljava/lang/String;)Ljava/sql/Time;",
)?;
let init_method = env.get_method_id(&cls, "<init>", "(J)V")?;
xuefengze marked this conversation as resolved.
Show resolved Hide resolved
Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
})?;
unsafe {
let class = <&JClass<'_>>::from(class_ref.as_obj());
match env.call_static_method_unchecked(
class,
*constructor,
ReturnType::Object,
&[jvalue {
l: string_value.into_raw(),
}],
)? {
JValueGen::Object(obj) => Ok(obj),
_ => Err(BindingError::from(jni::errors::Error::MethodNotFound {
name: "valueOf".to_string(),
sig: "(Ljava/lang/String;)Ljava/sql/Time;".into(),
})),
}
let time_class = <&JClass<'_>>::from(time_class_ref.as_obj());
let time_obj =
env.new_object_unchecked(time_class, *constructor, &[jvalue { j: millis }])?;
Ok(time_obj)
}
})
}
Expand Down
Loading