Skip to content

Commit

Permalink
feat(sink): turn sink writer into higher level log sinker (risingwave…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 26, 2023
1 parent 1ff58bc commit 471bfa2
Show file tree
Hide file tree
Showing 31 changed files with 677 additions and 492 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "6
] }
csv = "1.2"
duration-str = "0.5.1"
easy-ext = "1"
enum-as-inner = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
Expand Down
68 changes: 68 additions & 0 deletions src/connector/src/sink/blackhole.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset};
use crate::sink::{
DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkError, SinkParam, SinkWriterParam,
};

pub const BLACKHOLE_SINK: &str = "blackhole";

#[derive(Debug)]
pub struct BlackHoleSink;

impl TryFrom<SinkParam> for BlackHoleSink {
type Error = SinkError;

fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
Ok(Self)
}
}

impl Sink for BlackHoleSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = Self;

const SINK_NAME: &'static str = BLACKHOLE_SINK;

async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(Self)
}

async fn validate(&self) -> Result<()> {
Ok(())
}
}

impl LogSinker for BlackHoleSink {
async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> {
log_reader.init().await?;
loop {
let (epoch, item) = log_reader.next_item().await?;
match item {
LogStoreReadItem::StreamChunk { chunk_id, .. } => {
log_reader
.truncate(TruncateOffset::Chunk { epoch, chunk_id })
.await?;
}
LogStoreReadItem::Barrier { .. } => {
log_reader
.truncate(TruncateOffset::Barrier { epoch })
.await?;
}
LogStoreReadItem::UpdateVnodeBitmap(_) => {}
}
}
}
}
13 changes: 7 additions & 6 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use serde_with::serde_as;

use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::common::ClickHouseCommon;
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{
Result, Sink, SinkError, SinkParam, SinkWriter, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};

pub const CLICKHOUSE_SINK: &str = "clickhouse";
Expand Down Expand Up @@ -209,7 +209,7 @@ impl ClickHouseSink {
}
impl Sink for ClickHouseSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = ClickHouseSinkWriter;
type LogSinker = LogSinkerOf<ClickHouseSinkWriter>;

const SINK_NAME: &'static str = CLICKHOUSE_SINK;

Expand Down Expand Up @@ -243,14 +243,15 @@ impl Sink for ClickHouseSink {
Ok(())
}

async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result<Self::Writer> {
ClickHouseSinkWriter::new(
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(ClickHouseSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await
.await?
.into_log_sinker(writer_param.sink_metrics))
}
}
pub struct ClickHouseSinkWriter {
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::{CoordinatorStreamHandle, SinkCoordinationRpcClient};
use tracing::warn;

use crate::sink::{Result, SinkError, SinkParam, SinkWriter};
use crate::sink::writer::SinkWriter;
use crate::sink::{Result, SinkError, SinkParam};

pub struct CoordinatedSinkWriter<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
epoch: u64,
Expand Down
10 changes: 6 additions & 4 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::doris_connector::{DorisField, DorisInsert, DorisInsertClient, DORIS_D
use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use crate::common::DorisCommon;
use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode};
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam,
};
Expand Down Expand Up @@ -155,18 +156,19 @@ impl DorisSink {

impl Sink for DorisSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = DorisSinkWriter;
type LogSinker = LogSinkerOf<DorisSinkWriter>;

const SINK_NAME: &'static str = DORIS_SINK;

async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result<Self::Writer> {
DorisSinkWriter::new(
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(DorisSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await
.await?
.into_log_sinker(writer_param.sink_metrics))
}

async fn validate(&self) -> Result<()> {
Expand Down
13 changes: 7 additions & 6 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ use serde_json::Value;
use url::Url;

use super::{
Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::deserialize_bool_from_string;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::remote::{CoordinatedRemoteSink, RemoteSinkTrait};
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};

/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg".
Expand Down Expand Up @@ -257,7 +257,7 @@ impl IcebergSink {

impl Sink for IcebergSink {
type Coordinator = IcebergSinkCommitter;
type Writer = CoordinatedSinkWriter<IcebergWriter>;
type LogSinker = LogSinkerOf<CoordinatedSinkWriter<IcebergWriter>>;

const SINK_NAME: &'static str = ICEBERG_SINK;

Expand All @@ -266,7 +266,7 @@ impl Sink for IcebergSink {
Ok(())
}

async fn new_writer(&self, writer_param: SinkWriterParam) -> Result<Self::Writer> {
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let table = self.create_table().await?;

let inner = IcebergWriter {
Expand All @@ -277,7 +277,7 @@ impl Sink for IcebergSink {
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
table,
};
CoordinatedSinkWriter::new(
Ok(CoordinatedSinkWriter::new(
writer_param
.meta_client
.expect("should have meta client")
Expand All @@ -291,7 +291,8 @@ impl Sink for IcebergSink {
})?,
inner,
)
.await
.await?
.into_log_sinker(writer_param.sink_metrics))
}

async fn new_coordinator(
Expand Down
16 changes: 9 additions & 7 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};

use super::{
FormattedSink, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM,
SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
};
use crate::common::KafkaCommon;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::{
DummySinkCommitCoordinator, Result, SinkWriterParam, SinkWriterV1, SinkWriterV1Adapter,
use crate::sink::writer::{
FormattedSink, LogSinkerOf, SinkWriterExt, SinkWriterV1, SinkWriterV1Adapter,
};
use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext};
use crate::source::{SourceEnumeratorContext, SplitEnumerator};
use crate::{
Expand Down Expand Up @@ -300,11 +301,11 @@ impl TryFrom<SinkParam> for KafkaSink {

impl Sink for KafkaSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = SinkWriterV1Adapter<KafkaSinkWriter>;
type LogSinker = LogSinkerOf<SinkWriterV1Adapter<KafkaSinkWriter>>;

const SINK_NAME: &'static str = KAFKA_SINK;

async fn new_writer(&self, _writer_param: SinkWriterParam) -> Result<Self::Writer> {
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(SinkWriterV1Adapter::new(
KafkaSinkWriter::new(
self.config.clone(),
Expand All @@ -318,7 +319,8 @@ impl Sink for KafkaSink {
)?,
)
.await?,
))
)
.into_log_sinker(writer_param.sink_metrics))
}

async fn validate(&self) -> Result<()> {
Expand Down
16 changes: 9 additions & 7 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ use serde_with::serde_as;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

use super::{FormattedSink, SinkParam};
use super::SinkParam;
use crate::common::KinesisCommon;
use crate::dispatch_sink_formatter_impl;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriter, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY,
SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};

pub const KINESIS_SINK: &str = "kinesis";
Expand Down Expand Up @@ -66,7 +67,7 @@ impl TryFrom<SinkParam> for KinesisSink {

impl Sink for KinesisSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = KinesisSinkWriter;
type LogSinker = LogSinkerOf<KinesisSinkWriter>;

const SINK_NAME: &'static str = KINESIS_SINK;

Expand All @@ -93,16 +94,17 @@ impl Sink for KinesisSink {
Ok(())
}

async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result<Self::Writer> {
KinesisSinkWriter::new(
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(KinesisSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
self.db_name.clone(),
self.sink_from_name.clone(),
)
.await
.await?
.into_log_sinker(writer_param.sink_metrics))
}
}

Expand Down
Loading

0 comments on commit 471bfa2

Please sign in to comment.