From d53a958f24f03bebd4ab5cc72fce0630a2d98512 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 14 Sep 2023 16:55:12 +0800 Subject: [PATCH 1/3] replace validate sink rpc with jni --- .../connector/JniSinkValidationHandler.java | 80 +++++++++++++++++++ src/connector/src/sink/boxed.rs | 4 +- src/connector/src/sink/clickhouse.rs | 3 +- src/connector/src/sink/iceberg.rs | 2 +- src/connector/src/sink/kafka.rs | 3 +- src/connector/src/sink/kinesis.rs | 3 +- src/connector/src/sink/mod.rs | 6 +- src/connector/src/sink/nats.rs | 3 +- src/connector/src/sink/redis.rs | 3 +- src/connector/src/sink/remote.rs | 64 +++++++++++---- src/meta/src/rpc/ddl_controller.rs | 2 +- src/meta/src/stream/sink.rs | 8 +- .../tests/integration_tests/sink/basic.rs | 5 +- 13 files changed, 146 insertions(+), 40 deletions(-) create mode 100644 java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkValidationHandler.java diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkValidationHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkValidationHandler.java new file mode 100644 index 000000000000..3adfa4265870 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JniSinkValidationHandler.java @@ -0,0 +1,80 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector; + +import static com.risingwave.connector.SinkUtils.getConnectorName; + +import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.api.sink.SinkFactory; +import com.risingwave.proto.ConnectorServiceProto; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JniSinkValidationHandler { + static final Logger LOG = LoggerFactory.getLogger(SinkValidationHandler.class); + + public static byte[] validate(byte[] validateSinkRequestBytes) + throws com.google.protobuf.InvalidProtocolBufferException { + try { + var request = + ConnectorServiceProto.ValidateSinkRequest.parseFrom(validateSinkRequestBytes); + + // For jni.rs + java.lang.Thread.currentThread() + .setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader()); + + ConnectorServiceProto.SinkParam sinkParam = request.getSinkParam(); + TableSchema tableSchema = TableSchema.fromProto(sinkParam.getTableSchema()); + String connectorName = getConnectorName(request.getSinkParam()); + SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName); + sinkFactory.validate( + tableSchema, sinkParam.getPropertiesMap(), sinkParam.getSinkType()); + + return ConnectorServiceProto.ValidateSinkResponse.newBuilder().build().toByteArray(); + } catch (IllegalArgumentException e) { + LOG.error("sink validation failed", e); + // Extract useful information from the error thrown by Jackson and convert it into a + // more concise message. + String errorMessage = e.getLocalizedMessage(); + Pattern missingFieldPattern = Pattern.compile("Missing creator property '([^']*)'"); + Pattern unrecognizedFieldPattern = Pattern.compile("Unrecognized field \"([^\"]*)\""); + Matcher missingFieldMatcher = missingFieldPattern.matcher(errorMessage); + Matcher unrecognizedFieldMatcher = unrecognizedFieldPattern.matcher(errorMessage); + if (missingFieldMatcher.find()) { + errorMessage = "missing field `" + missingFieldMatcher.group(1) + "`"; + } else if (unrecognizedFieldMatcher.find()) { + errorMessage = "unknown field `" + unrecognizedFieldMatcher.group(1) + "`"; + } + return ConnectorServiceProto.ValidateSinkResponse.newBuilder() + .setError( + ConnectorServiceProto.ValidationError.newBuilder() + .setErrorMessage(errorMessage) + .build()) + .build() + .toByteArray(); + } catch (Exception e) { + LOG.error("sink validation failed", e); + return ConnectorServiceProto.ValidateSinkResponse.newBuilder() + .setError( + ConnectorServiceProto.ValidationError.newBuilder() + .setErrorMessage(e.getMessage()) + .build()) + .build() + .toByteArray(); + } + } +} diff --git a/src/connector/src/sink/boxed.rs b/src/connector/src/sink/boxed.rs index 0a7082961a00..d966b1b6dea0 100644 --- a/src/connector/src/sink/boxed.rs +++ b/src/connector/src/sink/boxed.rs @@ -76,8 +76,8 @@ impl Sink for BoxSink { type Coordinator = BoxCoordinator; type Writer = BoxWriter<()>; - async fn validate(&self, client: Option) -> crate::sink::Result<()> { - self.deref().validate(client).await + async fn validate(&self) -> crate::sink::Result<()> { + self.deref().validate().await } async fn new_writer(&self, writer_param: SinkWriterParam) -> crate::sink::Result { diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 187b87397dbf..8242026c81f5 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -23,7 +23,6 @@ use risingwave_common::catalog::Schema; use risingwave_common::row::Row; use risingwave_common::types::{DataType, ScalarRefImpl, Serial}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_rpc_client::ConnectorClient; use serde::ser::{SerializeSeq, SerializeStruct}; use serde::Serialize; use serde_derive::Deserialize; @@ -179,7 +178,7 @@ impl Sink for ClickHouseSink { type Coordinator = DummySinkCommitCoordinator; type Writer = ClickHouseSinkWriter; - async fn validate(&self, _client: Option) -> Result<()> { + async fn validate(&self) -> Result<()> { // For upsert clickhouse sink, the primary key must be defined. if !self.is_append_only && self.pk_indices.is_empty() { return Err(SinkError::Config(anyhow!( diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 451c8b2686ec..e2da4726883c 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -234,7 +234,7 @@ impl Sink for IcebergSink { type Coordinator = IcebergSinkCommitter; type Writer = CoordinatedSinkWriter; - async fn validate(&self, _client: Option) -> Result<()> { + async fn validate(&self) -> Result<()> { let _ = self.create_table().await?; Ok(()) } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 72234bcbbb90..8b3fe9f18728 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -28,7 +28,6 @@ use rdkafka::types::RDKafkaErrorCode; use rdkafka::ClientConfig; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_rpc_client::ConnectorClient; use serde_derive::{Deserialize, Serialize}; use serde_json::Value; use serde_with::{serde_as, DisplayFromStr}; @@ -301,7 +300,7 @@ impl Sink for KafkaSink { )) } - async fn validate(&self, _client: Option) -> Result<()> { + async fn validate(&self) -> Result<()> { // For upsert Kafka sink, the primary key must be defined. if !self.is_append_only && self.pk_indices.is_empty() { return Err(SinkError::Config(anyhow!( diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index b73b48771c2b..3efc371b5897 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -23,7 +23,6 @@ use aws_sdk_kinesis::Client as KinesisClient; use futures_async_stream::for_await; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_rpc_client::ConnectorClient; use serde_derive::Deserialize; use serde_with::serde_as; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -71,7 +70,7 @@ impl Sink for KinesisSink { type Coordinator = DummySinkCommitCoordinator; type Writer = KinesisSinkWriter; - async fn validate(&self, _client: Option) -> Result<()> { + async fn validate(&self) -> Result<()> { // For upsert Kafka sink, the primary key must be defined. if !self.is_append_only && self.pk_indices.is_empty() { return Err(SinkError::Config(anyhow!( diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 3e785edc7a9e..208c0f92f17f 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -151,7 +151,7 @@ pub trait Sink { type Writer: SinkWriter; type Coordinator: SinkCommitCoordinator; - async fn validate(&self, client: Option) -> Result<()>; + async fn validate(&self) -> Result<()>; async fn new_writer(&self, writer_param: SinkWriterParam) -> Result; async fn new_coordinator( &self, @@ -301,7 +301,7 @@ impl Sink for BlackHoleSink { Ok(Self) } - async fn validate(&self, _client: Option) -> Result<()> { + async fn validate(&self) -> Result<()> { Ok(()) } } @@ -467,6 +467,8 @@ pub enum SinkError { ClickHouse(String), #[error("Nats error: {0}")] Nats(anyhow::Error), + #[error("Internal error: {0}")] + Internal(anyhow::Error), } impl From for SinkError { diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index c2408acbdab9..e04235b6f586 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -19,7 +19,6 @@ use async_nats::jetstream::context::Context; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_common::error::anyhow_error; -use risingwave_rpc_client::ConnectorClient; use serde_derive::Deserialize; use serde_with::serde_as; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -85,7 +84,7 @@ impl Sink for NatsSink { type Coordinator = DummySinkCommitCoordinator; type Writer = NatsSinkWriter; - async fn validate(&self, _client: Option) -> Result<()> { + async fn validate(&self) -> Result<()> { if !self.is_append_only { return Err(SinkError::Nats(anyhow!( "Nats sink only support append-only mode" diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index e5afa88c38b2..f7957335aa1e 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -15,7 +15,6 @@ use async_trait::async_trait; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_rpc_client::ConnectorClient; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; @@ -40,7 +39,7 @@ impl Sink for RedisSink { todo!() } - async fn validate(&self, _client: Option) -> Result<()> { + async fn validate(&self) -> Result<()> { todo!() } } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 851c81b8916d..7e659bf9e3af 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -14,21 +14,26 @@ use std::collections::HashMap; use std::marker::PhantomData; +use std::ops::Deref; use anyhow::anyhow; use async_trait::async_trait; use itertools::Itertools; +use jni::objects::{JByteArray, JValue, JValueOwned}; use prost::Message; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; +use risingwave_jni_core::jvm_runtime::JVM; use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::json_payload::RowOp; use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::{ JsonPayload, Payload, StreamChunkPayload, }; use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse; -use risingwave_pb::connector_service::{SinkMetadata, SinkPayloadFormat}; +use risingwave_pb::connector_service::{ + SinkMetadata, SinkPayloadFormat, ValidateSinkRequest, ValidateSinkResponse, +}; #[cfg(test)] use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse}; use risingwave_rpc_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; @@ -112,7 +117,7 @@ impl Sink for RemoteSink { .await?) } - async fn validate(&self, client: Option) -> Result<()> { + async fn validate(&self) -> Result<()> { // FIXME: support struct and array in stream sink self.param.columns.iter().map(|col| { if matches!( @@ -144,17 +149,48 @@ impl Sink for RemoteSink { } }).try_collect()?; - let client = client.ok_or_else(|| { - SinkError::Remote(anyhow_error!( - "connector node endpoint not specified or unable to connect to connector node" - )) - })?; + let mut env = JVM + .as_ref() + .map_err(|err| SinkError::Internal(err.into()))? + .attach_current_thread() + .map_err(|err| SinkError::Internal(err.into()))?; + let validate_sink_request = ValidateSinkRequest { + sink_param: Some(self.param.to_proto()), + }; + let validate_sink_request_bytes = env + .byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request)) + .map_err(|err| SinkError::Internal(err.into()))?; + + let response = env + .call_static_method( + "com/risingwave/connector/JniSinkValidationHandler", + "validate", + "([B)[B", + &[JValue::Object(&validate_sink_request_bytes)], + ) + .map_err(|err| SinkError::Internal(err.into()))?; - // We validate a remote sink's accessibility as well as the pk. - client - .validate_sink_properties(self.param.to_proto()) - .await - .map_err(SinkError::from) + let validate_sink_response_bytes = match response { + JValueOwned::Object(o) => unsafe { JByteArray::from_raw(o.into_raw()) }, + _ => unreachable!(), + }; + + let validate_sink_response: ValidateSinkResponse = Message::decode( + risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, &mut env) + .map_err(|err| SinkError::Internal(err.into()))? + .deref(), + ) + .map_err(|err| SinkError::Internal(err.into()))?; + + validate_sink_response.error.map_or_else( + || Ok(()), // If there is no error message, return Ok here. + |err| { + Err(SinkError::Remote(anyhow!(format!( + "sink cannot pass validation: {}", + err.error_message + )))) + }, + ) } } @@ -166,8 +202,8 @@ impl Sink for CoordinatedRemoteSink { type Coordinator = RemoteCoordinator; type Writer = CoordinatedSinkWriter; - async fn validate(&self, client: Option) -> Result<()> { - self.0.validate(client).await + async fn validate(&self) -> Result<()> { + self.0.validate().await } async fn new_writer(&self, writer_param: SinkWriterParam) -> Result { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 254e1ae5eca2..59d9b082e01b 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -436,7 +436,7 @@ impl DdlController { } StreamingJob::Sink(sink) => { // Validate the sink on the connector node. - validate_sink(sink, self.env.connector_client()).await?; + validate_sink(sink).await?; } _ => {} } diff --git a/src/meta/src/stream/sink.rs b/src/meta/src/stream/sink.rs index 3f46b781bd6b..4717a1ffdfe9 100644 --- a/src/meta/src/stream/sink.rs +++ b/src/meta/src/stream/sink.rs @@ -16,18 +16,14 @@ use risingwave_connector::dispatch_sink; use risingwave_connector::sink::catalog::SinkCatalog; use risingwave_connector::sink::{build_sink, Sink, SinkParam}; use risingwave_pb::catalog::PbSink; -use risingwave_rpc_client::ConnectorClient; use crate::MetaResult; -pub async fn validate_sink( - prost_sink_catalog: &PbSink, - connector_client: Option, -) -> MetaResult<()> { +pub async fn validate_sink(prost_sink_catalog: &PbSink) -> MetaResult<()> { let sink_catalog = SinkCatalog::from(prost_sink_catalog); let param = SinkParam::from(sink_catalog); let sink = build_sink(param)?; - dispatch_sink!(sink, sink, { Ok(sink.validate(connector_client).await?) }) + dispatch_sink!(sink, sink, { Ok(sink.validate().await?) }) } diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index a12bc3643b54..b0e044377ab8 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -75,10 +75,7 @@ impl Sink for TestSink { type Coordinator = BoxCoordinator; type Writer = BoxWriter<()>; - async fn validate( - &self, - _client: Option, - ) -> risingwave_connector::sink::Result<()> { + async fn validate(&self) -> Result<()> { Ok(()) } From f5c3337b795581f259751e8f4e2497c129ff38a6 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 14 Sep 2023 18:04:59 +0800 Subject: [PATCH 2/3] fix --- ci/scripts/e2e-sink-test.sh | 3 +++ src/tests/simulation/tests/integration_tests/sink/basic.rs | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index e1b700ee0d17..2dc02f0eada7 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -5,6 +5,9 @@ set -euo pipefail source ci/scripts/common.sh +# prepare environment +export CONNECTOR_LIBS_PATH="./connector-node/libs" + while getopts 'p:' opt; do case ${opt} in p ) diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index b0e044377ab8..c0f9f7253f37 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -75,7 +75,7 @@ impl Sink for TestSink { type Coordinator = BoxCoordinator; type Writer = BoxWriter<()>; - async fn validate(&self) -> Result<()> { + async fn validate(&self) -> risingwave_connector::sink::Result<()> { Ok(()) } From 43eb96b1dd07126950cd30b0b9b7b6151bac8e59 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 20 Sep 2023 15:50:06 +0800 Subject: [PATCH 3/3] fmt --- src/connector/src/sink/pulsar.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 306fc2046b71..d1cbc0b5a527 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -24,7 +24,6 @@ use pulsar::producer::{Message, SendFuture}; use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; -use risingwave_rpc_client::ConnectorClient; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; @@ -171,7 +170,7 @@ impl Sink for PulsarSink { .await } - async fn validate(&self, _client: Option) -> Result<()> { + async fn validate(&self) -> Result<()> { // For upsert Pulsar sink, the primary key must be defined. if !self.is_append_only && self.downstream_pk.is_empty() { return Err(SinkError::Config(anyhow!(