Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): replace validate sink rpc with jni #12312

Merged
merged 6 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ set -euo pipefail

source ci/scripts/common.sh

# prepare environment
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
p )
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector;

import static com.risingwave.connector.SinkUtils.getConnectorName;

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.proto.ConnectorServiceProto;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JniSinkValidationHandler {
static final Logger LOG = LoggerFactory.getLogger(SinkValidationHandler.class);

public static byte[] validate(byte[] validateSinkRequestBytes)
throws com.google.protobuf.InvalidProtocolBufferException {
try {
var request =
ConnectorServiceProto.ValidateSinkRequest.parseFrom(validateSinkRequestBytes);

// For jni.rs
java.lang.Thread.currentThread()
.setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader());

ConnectorServiceProto.SinkParam sinkParam = request.getSinkParam();
TableSchema tableSchema = TableSchema.fromProto(sinkParam.getTableSchema());
String connectorName = getConnectorName(request.getSinkParam());
SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName);
sinkFactory.validate(
tableSchema, sinkParam.getPropertiesMap(), sinkParam.getSinkType());

return ConnectorServiceProto.ValidateSinkResponse.newBuilder().build().toByteArray();
} catch (IllegalArgumentException e) {
LOG.error("sink validation failed", e);
// Extract useful information from the error thrown by Jackson and convert it into a
// more concise message.
String errorMessage = e.getLocalizedMessage();
Pattern missingFieldPattern = Pattern.compile("Missing creator property '([^']*)'");
Pattern unrecognizedFieldPattern = Pattern.compile("Unrecognized field \"([^\"]*)\"");
Matcher missingFieldMatcher = missingFieldPattern.matcher(errorMessage);
Matcher unrecognizedFieldMatcher = unrecognizedFieldPattern.matcher(errorMessage);
if (missingFieldMatcher.find()) {
errorMessage = "missing field `" + missingFieldMatcher.group(1) + "`";
} else if (unrecognizedFieldMatcher.find()) {
errorMessage = "unknown field `" + unrecognizedFieldMatcher.group(1) + "`";
}
return ConnectorServiceProto.ValidateSinkResponse.newBuilder()
.setError(
ConnectorServiceProto.ValidationError.newBuilder()
.setErrorMessage(errorMessage)
.build())
.build()
.toByteArray();
} catch (Exception e) {
LOG.error("sink validation failed", e);
return ConnectorServiceProto.ValidateSinkResponse.newBuilder()
.setError(
ConnectorServiceProto.ValidationError.newBuilder()
.setErrorMessage(e.getMessage())
.build())
.build()
.toByteArray();
}
}
}
4 changes: 2 additions & 2 deletions src/connector/src/sink/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ impl Sink for BoxSink {
type Coordinator = BoxCoordinator;
type Writer = BoxWriter<()>;

async fn validate(&self, client: Option<ConnectorClient>) -> crate::sink::Result<()> {
self.deref().validate(client).await
async fn validate(&self) -> crate::sink::Result<()> {
self.deref().validate().await
}

async fn new_writer(&self, writer_param: SinkWriterParam) -> crate::sink::Result<Self::Writer> {
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ScalarRefImpl, Serial};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_rpc_client::ConnectorClient;
use serde::ser::{SerializeSeq, SerializeStruct};
use serde::Serialize;
use serde_derive::Deserialize;
Expand Down Expand Up @@ -179,7 +178,7 @@ impl Sink for ClickHouseSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = ClickHouseSinkWriter;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
// For upsert clickhouse sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl Sink for IcebergSink {
type Coordinator = IcebergSinkCommitter;
type Writer = CoordinatedSinkWriter<IcebergWriter>;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
let _ = self.create_table().await?;
Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use rdkafka::types::RDKafkaErrorCode;
use rdkafka::ClientConfig;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};

Expand Down Expand Up @@ -298,7 +297,7 @@ impl Sink for KafkaSink {
))
}

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
// For upsert Kafka sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use aws_sdk_kinesis::primitives::Blob;
use aws_sdk_kinesis::Client as KinesisClient;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand Down Expand Up @@ -69,7 +68,7 @@ impl Sink for KinesisSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = KinesisSinkWriter;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
// For upsert Kafka sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub trait Sink {
type Writer: SinkWriter<CommitMetadata = ()>;
type Coordinator: SinkCommitCoordinator;

async fn validate(&self, client: Option<ConnectorClient>) -> Result<()>;
async fn validate(&self) -> Result<()>;
async fn new_writer(&self, writer_param: SinkWriterParam) -> Result<Self::Writer>;
async fn new_coordinator(
&self,
Expand Down Expand Up @@ -346,7 +346,7 @@ impl Sink for BlackHoleSink {
Ok(Self)
}

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
Ok(())
}
}
Expand Down Expand Up @@ -519,6 +519,8 @@ pub enum SinkError {
Nats(anyhow::Error),
#[error("Pulsar error: {0}")]
Pulsar(anyhow::Error),
#[error("Internal error: {0}")]
Internal(anyhow::Error),
}

impl From<RpcError> for SinkError {
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use async_nats::jetstream::context::Context;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::anyhow_error;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand Down Expand Up @@ -85,7 +84,7 @@ impl Sink for NatsSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = NatsSinkWriter;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
if !self.is_append_only {
return Err(SinkError::Nats(anyhow!(
"Nats sink only support append-only mode"
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use pulsar::producer::{Message, SendFuture};
use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};

Expand Down Expand Up @@ -171,7 +170,7 @@ impl Sink for PulsarSink {
.await
}

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
// For upsert Pulsar sink, the primary key must be defined.
if !self.is_append_only && self.downstream_pk.is_empty() {
return Err(SinkError::Config(anyhow!(
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use async_trait::async_trait;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;

use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};

Expand All @@ -40,7 +39,7 @@ impl Sink for RedisSink {
todo!()
}

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
todo!()
}
}
Expand Down
64 changes: 50 additions & 14 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@

use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::Deref;

use anyhow::anyhow;
use async_trait::async_trait;
use itertools::Itertools;
use jni::objects::{JByteArray, JValue, JValueOwned};
use prost::Message;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::anyhow_error;
use risingwave_common::types::DataType;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::json_payload::RowOp;
use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::{
JsonPayload, Payload, StreamChunkPayload,
};
use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse;
use risingwave_pb::connector_service::{SinkMetadata, SinkPayloadFormat};
use risingwave_pb::connector_service::{
SinkMetadata, SinkPayloadFormat, ValidateSinkRequest, ValidateSinkResponse,
};
#[cfg(test)]
use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
use risingwave_rpc_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
Expand Down Expand Up @@ -112,7 +117,7 @@ impl Sink for RemoteSink {
.await?)
}

async fn validate(&self, client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
// FIXME: support struct and array in stream sink
self.param.columns.iter().map(|col| {
if matches!(
Expand Down Expand Up @@ -144,17 +149,48 @@ impl Sink for RemoteSink {
}
}).try_collect()?;

let client = client.ok_or_else(|| {
SinkError::Remote(anyhow_error!(
"connector node endpoint not specified or unable to connect to connector node"
))
})?;
let mut env = JVM
.as_ref()
.map_err(|err| SinkError::Internal(err.into()))?
.attach_current_thread()
.map_err(|err| SinkError::Internal(err.into()))?;
let validate_sink_request = ValidateSinkRequest {
sink_param: Some(self.param.to_proto()),
};
let validate_sink_request_bytes = env
.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))
.map_err(|err| SinkError::Internal(err.into()))?;

let response = env
.call_static_method(
"com/risingwave/connector/JniSinkValidationHandler",
"validate",
"([B)[B",
&[JValue::Object(&validate_sink_request_bytes)],
)
.map_err(|err| SinkError::Internal(err.into()))?;

// We validate a remote sink's accessibility as well as the pk.
client
.validate_sink_properties(self.param.to_proto())
.await
.map_err(SinkError::from)
let validate_sink_response_bytes = match response {
JValueOwned::Object(o) => unsafe { JByteArray::from_raw(o.into_raw()) },
_ => unreachable!(),
};

let validate_sink_response: ValidateSinkResponse = Message::decode(
risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, &mut env)
.map_err(|err| SinkError::Internal(err.into()))?
.deref(),
)
.map_err(|err| SinkError::Internal(err.into()))?;

validate_sink_response.error.map_or_else(
|| Ok(()), // If there is no error message, return Ok here.
|err| {
Err(SinkError::Remote(anyhow!(format!(
"sink cannot pass validation: {}",
err.error_message
))))
},
)
}
}

Expand All @@ -166,8 +202,8 @@ impl Sink for CoordinatedRemoteSink {
type Coordinator = RemoteCoordinator;
type Writer = CoordinatedSinkWriter<CoordinatedRemoteSinkWriter>;

async fn validate(&self, client: Option<ConnectorClient>) -> Result<()> {
self.0.validate(client).await
async fn validate(&self) -> Result<()> {
self.0.validate().await
}

async fn new_writer(&self, writer_param: SinkWriterParam) -> Result<Self::Writer> {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ impl DdlController {
}
StreamingJob::Sink(ref sink) => {
// Validate the sink on the connector node.
validate_sink(sink, self.env.connector_client()).await?;
validate_sink(sink).await?;
}
_ => {}
}
Expand Down
8 changes: 2 additions & 6 deletions src/meta/src/stream/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ use risingwave_connector::dispatch_sink;
use risingwave_connector::sink::catalog::SinkCatalog;
use risingwave_connector::sink::{build_sink, Sink, SinkParam};
use risingwave_pb::catalog::PbSink;
use risingwave_rpc_client::ConnectorClient;

use crate::MetaResult;

pub async fn validate_sink(
prost_sink_catalog: &PbSink,
connector_client: Option<ConnectorClient>,
) -> MetaResult<()> {
pub async fn validate_sink(prost_sink_catalog: &PbSink) -> MetaResult<()> {
let sink_catalog = SinkCatalog::from(prost_sink_catalog);
let param = SinkParam::from(sink_catalog);

let sink = build_sink(param)?;

dispatch_sink!(sink, sink, { Ok(sink.validate(connector_client).await?) })
dispatch_sink!(sink, sink, { Ok(sink.validate().await?) })
}
5 changes: 1 addition & 4 deletions src/tests/simulation/tests/integration_tests/sink/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ impl Sink for TestSink {
type Coordinator = BoxCoordinator;
type Writer = BoxWriter<()>;

async fn validate(
&self,
_client: Option<risingwave_rpc_client::ConnectorClient>,
) -> risingwave_connector::sink::Result<()> {
async fn validate(&self) -> risingwave_connector::sink::Result<()> {
Ok(())
}

Expand Down
Loading