Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sink): construct java.sql.time/date object with a long type #13651

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriterBase;
import io.grpc.Status;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -64,6 +65,14 @@ public class EsSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s";

private static final TimeZone UTCTimeZone = TimeZone.getTimeZone("UTC");
private static final SimpleDateFormat tDfm =
createSimpleDateFormat("HH:mm:ss.SSS", UTCTimeZone);
private static final SimpleDateFormat tsDfm =
createSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", UTCTimeZone);
private static final SimpleDateFormat tstzDfm =
createSimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", UTCTimeZone);

private final EsSinkConfig config;
private final BulkProcessor bulkProcessor;
private final RestHighLevelClient client;
Expand Down Expand Up @@ -199,15 +208,23 @@ private Map<String, Object> buildDoc(SinkRow row)
var type = columnDescs.get(i).getDataType().getTypeName();
Object col = row.get(i);
switch (type) {
case DATE:
case TIME:
case TIMESTAMP:
case TIMESTAMPTZ:
// es client doesn't natively support java.sql.Timestamp/Time/Date
// so we need to convert Date/Time/Timestamp type into a string as suggested in
// https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292
case DATE:
col = col.toString();
break;
// construct java.sql.Time/Timestamp with milliseconds time value.
// it will use system timezone by default, so we have to set timezone manually
case TIME:
col = tDfm.format(col);
break;
case TIMESTAMP:
col = tsDfm.format(col);
break;
case TIMESTAMPTZ:
col = tstzDfm.format(col);
break;
case JSONB:
ObjectMapper mapper = new ObjectMapper();
col =
Expand Down Expand Up @@ -315,4 +332,11 @@ public void drop() {
public RestHighLevelClient getClient() {
return client;
}

private static final SimpleDateFormat createSimpleDateFormat(
String pattern, TimeZone timeZone) {
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
sdf.setTimeZone(timeZone);
return sdf;
}
}
1 change: 1 addition & 0 deletions src/jni_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ normal = ["workspace-hack"]
anyhow = "1"
bytes = "1"
cfg-or-panic = "0.2"
chrono = { version = "0.4", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.12"
jni = "0.21.1"
Expand Down
92 changes: 28 additions & 64 deletions src/jni_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ use std::sync::{LazyLock, OnceLock};
use anyhow::anyhow;
use bytes::Bytes;
use cfg_or_panic::cfg_or_panic;
use chrono::NaiveDateTime;
use jni::objects::{
AutoElements, GlobalRef, JByteArray, JClass, JMethodID, JObject, JStaticMethodID, JString,
JValueGen, JValueOwned, ReleaseMode,
AutoElements, GlobalRef, JByteArray, JClass, JMethodID, JObject, JString, ReleaseMode,
};
use jni::signature::ReturnType;
use jni::sys::{
jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort, jsize, jvalue, JNI_FALSE, JNI_TRUE,
};
Expand Down Expand Up @@ -234,8 +233,8 @@ struct JavaClassMethodCache {
big_decimal_ctor: OnceLock<(GlobalRef, JMethodID)>,
timestamp_ctor: OnceLock<(GlobalRef, JMethodID)>,

date_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
time_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
date_ctor: OnceLock<(GlobalRef, JMethodID)>,
time_ctor: OnceLock<(GlobalRef, JMethodID)>,
}

// TODO: may only return a RowRef
Expand Down Expand Up @@ -675,40 +674,22 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDateValue
idx: jint,
) -> JObject<'a> {
execute_and_catch(env, move |env: &mut EnvParam<'_>| {
let value = pointer
.as_ref()
.datum_at(idx as usize)
.unwrap()
.into_date()
.0
.to_string();
// Constructs a Date object using a milliseconds time value.
let value = pointer.as_ref().datum_at(idx as usize).unwrap().into_date();
let datetime = value.0.and_hms_opt(0, 0, 0).unwrap();
let millis = datetime.timestamp_millis();

let string_value = env.new_string(value)?;
let (class_ref, constructor) =
let (date_class_ref, constructor) =
pointer.as_ref().class_cache.date_ctor.get_or_try_init(|| {
let cls = env.find_class("java/sql/Date")?;
let init_method = env.get_static_method_id(
&cls,
"valueOf",
"(Ljava/lang/String;)Ljava/sql/Date;",
)?;
let cls = env.find_class(gen_class_name!(java.sql.Date))?;
let init_method =
env.get_method_id(&cls, "<init>", gen_jni_sig!(void Date(long)))?;
Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
})?;
unsafe {
let JValueOwned::Object(date_obj) = env.call_static_method_unchecked(
<&JClass<'_>>::from(class_ref.as_obj()),
*constructor,
ReturnType::Object,
&[jvalue {
l: string_value.into_raw(),
}],
)?
else {
return Err(BindingError::from(jni::errors::Error::MethodNotFound {
name: "valueOf".to_string(),
sig: "(Ljava/lang/String;)Ljava/sql/Date;".into(),
}));
};
let date_class = <&JClass<'_>>::from(date_class_ref.as_obj());
let date_obj =
env.new_object_unchecked(date_class, *constructor, &[jvalue { j: millis }])?;
Ok(date_obj)
}
})
Expand All @@ -721,41 +702,24 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimeValue
idx: jint,
) -> JObject<'a> {
execute_and_catch(env, move |env: &mut EnvParam<'_>| {
let value = pointer
.as_ref()
.datum_at(idx as usize)
.unwrap()
.into_time()
.0
.to_string();
// Constructs a Time object using a milliseconds time value.
let value = pointer.as_ref().datum_at(idx as usize).unwrap().into_time();
let epoch_date = NaiveDateTime::UNIX_EPOCH.date();
let datetime = epoch_date.and_time(value.0);
let millis = datetime.timestamp_millis();

let string_value = env.new_string(value)?;
let (class_ref, constructor) =
let (time_class_ref, constructor) =
pointer.as_ref().class_cache.time_ctor.get_or_try_init(|| {
let cls = env.find_class("java/sql/Time")?;
let init_method = env.get_static_method_id(
&cls,
"valueOf",
"(Ljava/lang/String;)Ljava/sql/Time;",
)?;
let cls = env.find_class(gen_class_name!(java.sql.Time))?;
let init_method =
env.get_method_id(&cls, "<init>", gen_jni_sig!(void Time(long)))?;
Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
})?;
unsafe {
let class = <&JClass<'_>>::from(class_ref.as_obj());
match env.call_static_method_unchecked(
class,
*constructor,
ReturnType::Object,
&[jvalue {
l: string_value.into_raw(),
}],
)? {
JValueGen::Object(obj) => Ok(obj),
_ => Err(BindingError::from(jni::errors::Error::MethodNotFound {
name: "valueOf".to_string(),
sig: "(Ljava/lang/String;)Ljava/sql/Time;".into(),
})),
}
let time_class = <&JClass<'_>>::from(time_class_ref.as_obj());
let time_obj =
env.new_object_unchecked(time_class, *constructor, &[jvalue { j: millis }])?;
Ok(time_obj)
}
})
}
Expand Down
Loading