Skip to content

Commit

Permalink
refactor(connector): replace validate sink rpc with jni (risingwavela…
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Sep 21, 2023
1 parent f075a6b commit a12611d
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 42 deletions.
3 changes: 3 additions & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ set -euo pipefail

source ci/scripts/common.sh

# prepare environment
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
p )
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector;

import static com.risingwave.connector.SinkUtils.getConnectorName;

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.proto.ConnectorServiceProto;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JniSinkValidationHandler {
static final Logger LOG = LoggerFactory.getLogger(SinkValidationHandler.class);

public static byte[] validate(byte[] validateSinkRequestBytes)
throws com.google.protobuf.InvalidProtocolBufferException {
try {
var request =
ConnectorServiceProto.ValidateSinkRequest.parseFrom(validateSinkRequestBytes);

// For jni.rs
java.lang.Thread.currentThread()
.setContextClassLoader(java.lang.ClassLoader.getSystemClassLoader());

ConnectorServiceProto.SinkParam sinkParam = request.getSinkParam();
TableSchema tableSchema = TableSchema.fromProto(sinkParam.getTableSchema());
String connectorName = getConnectorName(request.getSinkParam());
SinkFactory sinkFactory = SinkUtils.getSinkFactory(connectorName);
sinkFactory.validate(
tableSchema, sinkParam.getPropertiesMap(), sinkParam.getSinkType());

return ConnectorServiceProto.ValidateSinkResponse.newBuilder().build().toByteArray();
} catch (IllegalArgumentException e) {
LOG.error("sink validation failed", e);
// Extract useful information from the error thrown by Jackson and convert it into a
// more concise message.
String errorMessage = e.getLocalizedMessage();
Pattern missingFieldPattern = Pattern.compile("Missing creator property '([^']*)'");
Pattern unrecognizedFieldPattern = Pattern.compile("Unrecognized field \"([^\"]*)\"");
Matcher missingFieldMatcher = missingFieldPattern.matcher(errorMessage);
Matcher unrecognizedFieldMatcher = unrecognizedFieldPattern.matcher(errorMessage);
if (missingFieldMatcher.find()) {
errorMessage = "missing field `" + missingFieldMatcher.group(1) + "`";
} else if (unrecognizedFieldMatcher.find()) {
errorMessage = "unknown field `" + unrecognizedFieldMatcher.group(1) + "`";
}
return ConnectorServiceProto.ValidateSinkResponse.newBuilder()
.setError(
ConnectorServiceProto.ValidationError.newBuilder()
.setErrorMessage(errorMessage)
.build())
.build()
.toByteArray();
} catch (Exception e) {
LOG.error("sink validation failed", e);
return ConnectorServiceProto.ValidateSinkResponse.newBuilder()
.setError(
ConnectorServiceProto.ValidationError.newBuilder()
.setErrorMessage(e.getMessage())
.build())
.build()
.toByteArray();
}
}
}
4 changes: 2 additions & 2 deletions src/connector/src/sink/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ impl Sink for BoxSink {
type Coordinator = BoxCoordinator;
type Writer = BoxWriter<()>;

async fn validate(&self, client: Option<ConnectorClient>) -> crate::sink::Result<()> {
self.deref().validate(client).await
async fn validate(&self) -> crate::sink::Result<()> {
self.deref().validate().await
}

async fn new_writer(&self, writer_param: SinkWriterParam) -> crate::sink::Result<Self::Writer> {
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ScalarRefImpl, Serial};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_rpc_client::ConnectorClient;
use serde::ser::{SerializeSeq, SerializeStruct};
use serde::Serialize;
use serde_derive::Deserialize;
Expand Down Expand Up @@ -179,7 +178,7 @@ impl Sink for ClickHouseSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = ClickHouseSinkWriter;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
// For upsert clickhouse sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl Sink for IcebergSink {
type Coordinator = IcebergSinkCommitter;
type Writer = CoordinatedSinkWriter<IcebergWriter>;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
let _ = self.create_table().await?;
Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use rdkafka::types::RDKafkaErrorCode;
use rdkafka::ClientConfig;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};

Expand Down Expand Up @@ -298,7 +297,7 @@ impl Sink for KafkaSink {
))
}

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
// For upsert Kafka sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use aws_sdk_kinesis::primitives::Blob;
use aws_sdk_kinesis::Client as KinesisClient;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand Down Expand Up @@ -69,7 +68,7 @@ impl Sink for KinesisSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = KinesisSinkWriter;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
// For upsert Kafka sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub trait Sink {
type Writer: SinkWriter<CommitMetadata = ()>;
type Coordinator: SinkCommitCoordinator;

async fn validate(&self, client: Option<ConnectorClient>) -> Result<()>;
async fn validate(&self) -> Result<()>;
async fn new_writer(&self, writer_param: SinkWriterParam) -> Result<Self::Writer>;
async fn new_coordinator(
&self,
Expand Down Expand Up @@ -346,7 +346,7 @@ impl Sink for BlackHoleSink {
Ok(Self)
}

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
Ok(())
}
}
Expand Down Expand Up @@ -519,6 +519,8 @@ pub enum SinkError {
Nats(anyhow::Error),
#[error("Pulsar error: {0}")]
Pulsar(anyhow::Error),
#[error("Internal error: {0}")]
Internal(anyhow::Error),
}

impl From<RpcError> for SinkError {
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use async_nats::jetstream::context::Context;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::anyhow_error;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand Down Expand Up @@ -85,7 +84,7 @@ impl Sink for NatsSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = NatsSinkWriter;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
if !self.is_append_only {
return Err(SinkError::Nats(anyhow!(
"Nats sink only support append-only mode"
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use pulsar::producer::{Message, SendFuture};
use pulsar::{Producer, ProducerOptions, Pulsar, TokioExecutor};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};

Expand Down Expand Up @@ -171,7 +170,7 @@ impl Sink for PulsarSink {
.await
}

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
// For upsert Pulsar sink, the primary key must be defined.
if !self.is_append_only && self.downstream_pk.is_empty() {
return Err(SinkError::Config(anyhow!(
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use async_trait::async_trait;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;

use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};

Expand All @@ -40,7 +39,7 @@ impl Sink for RedisSink {
todo!()
}

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
todo!()
}
}
Expand Down
64 changes: 50 additions & 14 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@

use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::Deref;

use anyhow::anyhow;
use async_trait::async_trait;
use itertools::Itertools;
use jni::objects::{JByteArray, JValue, JValueOwned};
use prost::Message;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::anyhow_error;
use risingwave_common::types::DataType;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::json_payload::RowOp;
use risingwave_pb::connector_service::sink_writer_stream_request::write_batch::{
JsonPayload, Payload, StreamChunkPayload,
};
use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse;
use risingwave_pb::connector_service::{SinkMetadata, SinkPayloadFormat};
use risingwave_pb::connector_service::{
SinkMetadata, SinkPayloadFormat, ValidateSinkRequest, ValidateSinkResponse,
};
#[cfg(test)]
use risingwave_pb::connector_service::{SinkWriterStreamRequest, SinkWriterStreamResponse};
use risingwave_rpc_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
Expand Down Expand Up @@ -112,7 +117,7 @@ impl Sink for RemoteSink {
.await?)
}

async fn validate(&self, client: Option<ConnectorClient>) -> Result<()> {
async fn validate(&self) -> Result<()> {
// FIXME: support struct and array in stream sink
self.param.columns.iter().map(|col| {
if matches!(
Expand Down Expand Up @@ -144,17 +149,48 @@ impl Sink for RemoteSink {
}
}).try_collect()?;

let client = client.ok_or_else(|| {
SinkError::Remote(anyhow_error!(
"connector node endpoint not specified or unable to connect to connector node"
))
})?;
let mut env = JVM
.as_ref()
.map_err(|err| SinkError::Internal(err.into()))?
.attach_current_thread()
.map_err(|err| SinkError::Internal(err.into()))?;
let validate_sink_request = ValidateSinkRequest {
sink_param: Some(self.param.to_proto()),
};
let validate_sink_request_bytes = env
.byte_array_from_slice(&Message::encode_to_vec(&validate_sink_request))
.map_err(|err| SinkError::Internal(err.into()))?;

let response = env
.call_static_method(
"com/risingwave/connector/JniSinkValidationHandler",
"validate",
"([B)[B",
&[JValue::Object(&validate_sink_request_bytes)],
)
.map_err(|err| SinkError::Internal(err.into()))?;

// We validate a remote sink's accessibility as well as the pk.
client
.validate_sink_properties(self.param.to_proto())
.await
.map_err(SinkError::from)
let validate_sink_response_bytes = match response {
JValueOwned::Object(o) => unsafe { JByteArray::from_raw(o.into_raw()) },
_ => unreachable!(),
};

let validate_sink_response: ValidateSinkResponse = Message::decode(
risingwave_jni_core::to_guarded_slice(&validate_sink_response_bytes, &mut env)
.map_err(|err| SinkError::Internal(err.into()))?
.deref(),
)
.map_err(|err| SinkError::Internal(err.into()))?;

validate_sink_response.error.map_or_else(
|| Ok(()), // If there is no error message, return Ok here.
|err| {
Err(SinkError::Remote(anyhow!(format!(
"sink cannot pass validation: {}",
err.error_message
))))
},
)
}
}

Expand All @@ -166,8 +202,8 @@ impl Sink for CoordinatedRemoteSink {
type Coordinator = RemoteCoordinator;
type Writer = CoordinatedSinkWriter<CoordinatedRemoteSinkWriter>;

async fn validate(&self, client: Option<ConnectorClient>) -> Result<()> {
self.0.validate(client).await
async fn validate(&self) -> Result<()> {
self.0.validate().await
}

async fn new_writer(&self, writer_param: SinkWriterParam) -> Result<Self::Writer> {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ impl DdlController {
}
StreamingJob::Sink(ref sink) => {
// Validate the sink on the connector node.
validate_sink(sink, self.env.connector_client()).await?;
validate_sink(sink).await?;
}
_ => {}
}
Expand Down
8 changes: 2 additions & 6 deletions src/meta/src/stream/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ use risingwave_connector::dispatch_sink;
use risingwave_connector::sink::catalog::SinkCatalog;
use risingwave_connector::sink::{build_sink, Sink, SinkParam};
use risingwave_pb::catalog::PbSink;
use risingwave_rpc_client::ConnectorClient;

use crate::MetaResult;

pub async fn validate_sink(
prost_sink_catalog: &PbSink,
connector_client: Option<ConnectorClient>,
) -> MetaResult<()> {
pub async fn validate_sink(prost_sink_catalog: &PbSink) -> MetaResult<()> {
let sink_catalog = SinkCatalog::from(prost_sink_catalog);
let param = SinkParam::from(sink_catalog);

let sink = build_sink(param)?;

dispatch_sink!(sink, sink, { Ok(sink.validate(connector_client).await?) })
dispatch_sink!(sink, sink, { Ok(sink.validate().await?) })
}
5 changes: 1 addition & 4 deletions src/tests/simulation/tests/integration_tests/sink/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ impl Sink for TestSink {
type Coordinator = BoxCoordinator;
type Writer = BoxWriter<()>;

async fn validate(
&self,
_client: Option<risingwave_rpc_client::ConnectorClient>,
) -> risingwave_connector::sink::Result<()> {
async fn validate(&self) -> risingwave_connector::sink::Result<()> {
Ok(())
}

Expand Down

0 comments on commit a12611d

Please sign in to comment.