Skip to content

Commit

Permalink
refactor(sink): refine sink trait and macro (#12478)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 26, 2023
1 parent b7c8c9d commit f38554e
Show file tree
Hide file tree
Showing 15 changed files with 395 additions and 392 deletions.
2 changes: 1 addition & 1 deletion e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ create sink invalid_sink_type from t with (connector = 'blackhole', type = 'inva
statement error `force_append_only` must be true or false
create sink invalid_force_append_only from t with (connector = 'blackhole', force_append_only = 'invalid');

statement error invalid connector type: invalid
statement error db error: ERROR: QueryError: internal error: Sink error: config error: unsupported sink connector invalid
create sink invalid_connector from t with (connector = 'invalid');

statement ok
Expand Down
35 changes: 2 additions & 33 deletions src/connector/src/sink/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::ops::DerefMut;
use std::sync::Arc;

use async_trait::async_trait;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::ConnectorClient;

use crate::sink::{Sink, SinkCommitCoordinator, SinkWriter, SinkWriterParam};
use crate::sink::{SinkCommitCoordinator, SinkWriter};

pub type BoxWriter<CM> = Box<dyn SinkWriter<CommitMetadata = CM> + Send + 'static>;
pub type BoxCoordinator = Box<dyn SinkCommitCoordinator + Send + 'static>;
pub type BoxSink =
Box<dyn Sink<Writer = BoxWriter<()>, Coordinator = BoxCoordinator> + Send + Sync + 'static>;

impl Debug for BoxSink {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("BoxSink")
}
}

#[async_trait]
impl<CM: 'static + Send> SinkWriter for BoxWriter<CM> {
Expand Down Expand Up @@ -70,24 +60,3 @@ impl SinkCommitCoordinator for BoxCoordinator {
self.deref_mut().commit(epoch, metadata).await
}
}

#[async_trait]
impl Sink for BoxSink {
type Coordinator = BoxCoordinator;
type Writer = BoxWriter<()>;

async fn validate(&self) -> crate::sink::Result<()> {
self.deref().validate().await
}

async fn new_writer(&self, writer_param: SinkWriterParam) -> crate::sink::Result<Self::Writer> {
self.deref().new_writer(writer_param).await
}

async fn new_coordinator(
&self,
connector_client: Option<ConnectorClient>,
) -> crate::sink::Result<Self::Coordinator> {
self.deref().new_coordinator(connector_client).await
}
}
29 changes: 16 additions & 13 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use serde_with::serde_as;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::common::ClickHouseCommon;
use crate::sink::{
Result, Sink, SinkError, SinkWriter, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Result, Sink, SinkError, SinkParam, SinkWriter, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
};

pub const CLICKHOUSE_SINK: &str = "clickhouse";
Expand Down Expand Up @@ -70,21 +71,22 @@ impl ClickHouseConfig {
}
}

impl ClickHouseSink {
pub fn new(
config: ClickHouseConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Result<Self> {
impl TryFrom<SinkParam> for ClickHouseSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = ClickHouseConfig::from_hashmap(param.properties)?;
Ok(Self {
config,
schema,
pk_indices,
is_append_only,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
})
}
}

impl ClickHouseSink {
/// Check that the column names and types of risingwave and clickhouse are identical
fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> {
let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?;
Expand Down Expand Up @@ -205,11 +207,12 @@ impl ClickHouseSink {
Ok(())
}
}
#[async_trait::async_trait]
impl Sink for ClickHouseSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = ClickHouseSinkWriter;

const SINK_NAME: &'static str = CLICKHOUSE_SINK;

async fn validate(&self) -> Result<()> {
// For upsert clickhouse sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
Expand Down Expand Up @@ -241,13 +244,13 @@ impl Sink for ClickHouseSink {
}

async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result<Self::Writer> {
Ok(ClickHouseSinkWriter::new(
ClickHouseSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await?)
.await
}
}
pub struct ClickHouseSinkWriter {
Expand Down
26 changes: 22 additions & 4 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use super::doris_connector::{DorisField, DorisInsert, DorisInsertClient, DORIS_D
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::common::DorisCommon;
use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam,
};

pub const DORIS_SINK: &str = "doris";
#[serde_as]
Expand Down Expand Up @@ -151,19 +153,20 @@ impl DorisSink {
}
}

#[async_trait]
impl Sink for DorisSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = DorisSinkWriter;

const SINK_NAME: &'static str = DORIS_SINK;

async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result<Self::Writer> {
Ok(DorisSinkWriter::new(
DorisSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await?)
.await
}

async fn validate(&self) -> Result<()> {
Expand Down Expand Up @@ -195,6 +198,21 @@ pub struct DorisSinkWriter {
row_encoder: JsonEncoder,
}

impl TryFrom<SinkParam> for DorisSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = DorisConfig::from_hashmap(param.properties)?;
DorisSink::new(
config,
schema,
param.downstream_pk,
param.sink_type.is_append_only(),
)
}
}

impl DorisSinkWriter {
pub async fn new(
config: DorisConfig,
Expand Down
28 changes: 22 additions & 6 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,21 @@ use super::{
};
use crate::deserialize_bool_from_string;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::remote::{CoordinatedRemoteSink, RemoteConfig};
use crate::sink::remote::{CoordinatedRemoteSink, RemoteSinkTrait};
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};

/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg".
pub const ICEBERG_SINK: &str = "iceberg";
pub const REMOTE_ICEBERG_SINK: &str = "iceberg_java";

pub type RemoteIcebergSink = CoordinatedRemoteSink;
pub type RemoteIcebergConfig = RemoteConfig;
#[derive(Debug)]
pub struct RemoteIceberg;

impl RemoteSinkTrait for RemoteIceberg {
const SINK_NAME: &'static str = REMOTE_ICEBERG_SINK;
}

pub type RemoteIcebergSink = CoordinatedRemoteSink<RemoteIceberg>;

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -192,6 +198,15 @@ pub struct IcebergSink {
param: SinkParam,
}

impl TryFrom<SinkParam> for IcebergSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let config = IcebergConfig::from_hashmap(param.properties.clone())?;
IcebergSink::new(config, param)
}
}

impl Debug for IcebergSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IcebergSink")
Expand Down Expand Up @@ -240,11 +255,12 @@ impl IcebergSink {
}
}

#[async_trait::async_trait]
impl Sink for IcebergSink {
type Coordinator = IcebergSinkCommitter;
type Writer = CoordinatedSinkWriter<IcebergWriter>;

const SINK_NAME: &'static str = ICEBERG_SINK;

async fn validate(&self) -> Result<()> {
let _ = self.create_table().await?;
Ok(())
Expand All @@ -261,7 +277,7 @@ impl Sink for IcebergSink {
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
table,
};
Ok(CoordinatedSinkWriter::new(
CoordinatedSinkWriter::new(
writer_param
.meta_client
.expect("should have meta client")
Expand All @@ -275,7 +291,7 @@ impl Sink for IcebergSink {
})?,
inner,
)
.await?)
.await
}

async fn new_coordinator(
Expand Down
17 changes: 11 additions & 6 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,24 +281,29 @@ pub struct KafkaSink {
sink_from_name: String,
}

impl KafkaSink {
pub fn new(config: KafkaConfig, param: SinkParam) -> Self {
Self {
impl TryFrom<SinkParam> for KafkaSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = KafkaConfig::from_hashmap(param.properties)?;
Ok(Self {
config,
schema: param.schema(),
schema,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
db_name: param.db_name,
sink_from_name: param.sink_from_name,
}
})
}
}

#[async_trait::async_trait]
impl Sink for KafkaSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = SinkWriterV1Adapter<KafkaSinkWriter>;

const SINK_NAME: &'static str = KAFKA_SINK;

async fn new_writer(&self, _writer_param: SinkWriterParam) -> Result<Self::Writer> {
Ok(SinkWriterV1Adapter::new(
KafkaSinkWriter::new(
Expand Down
17 changes: 11 additions & 6 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,29 @@ pub struct KinesisSink {
sink_from_name: String,
}

impl KinesisSink {
pub fn new(config: KinesisSinkConfig, param: SinkParam) -> Self {
Self {
impl TryFrom<SinkParam> for KinesisSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = KinesisSinkConfig::from_hashmap(param.properties)?;
Ok(Self {
config,
schema: param.schema(),
schema,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
db_name: param.db_name,
sink_from_name: param.sink_from_name,
}
})
}
}

#[async_trait::async_trait]
impl Sink for KinesisSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = KinesisSinkWriter;

const SINK_NAME: &'static str = KINESIS_SINK;

async fn validate(&self) -> Result<()> {
// For upsert Kafka sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
Expand Down
Loading

0 comments on commit f38554e

Please sign in to comment.