Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): turn sink writer into higher level log sinker #12152

Merged
merged 22 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d14531a
feat(log-store): refine log store trait and add update vnode bitmap i…
wenym1 Aug 30, 2023
995a9ec
fix deterministic build
wenym1 Aug 31, 2023
918174f
Merge branch 'main' into yiming/log-store-read-update-vnode
wenym1 Sep 6, 2023
cc48d78
feat(sink): turn sink writer with higher level log sinker
wenym1 Sep 7, 2023
62edb22
Merge branch 'main' into yiming/log-store-read-update-vnode
wenym1 Sep 7, 2023
cadde76
Merge branch 'yiming/log-store-read-update-vnode' into yiming/sink-as…
wenym1 Sep 7, 2023
69a1972
fix license and refactor with easy_ext
wenym1 Sep 7, 2023
60e2ae7
Merge branch 'main' into yiming/sink-as-log-consumer
wenym1 Sep 11, 2023
c3dc9f7
Merge branch 'main' into yiming/sink-as-log-consumer
wenym1 Sep 14, 2023
5073c42
Merge branch 'main' into yiming/sink-as-log-consumer
wenym1 Sep 18, 2023
61d19e8
Merge branch 'main' into yiming/sink-as-log-consumer
wenym1 Sep 19, 2023
8215530
Merge branch 'main' into yiming/sink-as-log-consumer
wenym1 Sep 20, 2023
a0ca31d
refactor(sink): refine sink trait and macro
wenym1 Sep 21, 2023
3b37697
fix linter
wenym1 Sep 21, 2023
e65a806
refine code
wenym1 Sep 21, 2023
0967c3f
Merge branch 'main' into yiming/refine-sink-trait
wenym1 Sep 26, 2023
fd685d8
fix unit test
wenym1 Sep 26, 2023
c855fbf
Merge branch 'main' into yiming/refine-sink-trait
wenym1 Sep 26, 2023
0daa499
Merge branch 'yiming/refine-sink-trait' into yiming/sink-as-log-consumer
wenym1 Sep 26, 2023
76334f8
Merge branch 'main' into yiming/refine-sink-trait
wenym1 Sep 26, 2023
dd0f6b0
Merge branch 'yiming/refine-sink-trait' into yiming/sink-as-log-consumer
wenym1 Sep 26, 2023
90d2028
remove dead code
wenym1 Sep 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "6
] }
csv = "1.2"
duration-str = "0.5.1"
easy-ext = "1"
enum-as-inner = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
Expand Down
60 changes: 60 additions & 0 deletions src/connector/src/sink/blackhole.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
use risingwave_rpc_client::ConnectorClient;

use crate::sink::log_store::{LogReader, LogStoreReadItem, TruncateOffset};
use crate::sink::{DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkWriterParam};

pub const BLACKHOLE_SINK: &str = "blackhole";

#[derive(Debug)]
pub struct BlackHoleSink;

#[async_trait]
impl Sink for BlackHoleSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = Self;

async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(Self)
}

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
Ok(())
}
}

impl LogSinker for BlackHoleSink {
async fn consume_log_and_sink(self, mut log_reader: impl LogReader) -> Result<()> {
log_reader.init().await?;
loop {
let (epoch, item) = log_reader.next_item().await?;
match item {
LogStoreReadItem::StreamChunk { chunk_id, .. } => {
log_reader
.truncate(TruncateOffset::Chunk { epoch, chunk_id })
.await?;
}
LogStoreReadItem::Barrier { .. } => {
log_reader
.truncate(TruncateOffset::Barrier { epoch })
.await?;
}
LogStoreReadItem::UpdateVnodeBitmap(_) => {}
}
}
}
}
20 changes: 14 additions & 6 deletions src/connector/src/sink/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ use risingwave_common::buffer::Bitmap;
use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::ConnectorClient;

use crate::sink::{Sink, SinkCommitCoordinator, SinkWriter, SinkWriterParam};
use crate::sink::writer::{LogSinkerOf, SinkWriter};
use crate::sink::{Sink, SinkCommitCoordinator, SinkWriterParam};

pub type BoxWriter<CM> = Box<dyn SinkWriter<CommitMetadata = CM> + Send + 'static>;
pub type BoxCoordinator = Box<dyn SinkCommitCoordinator + Send + 'static>;
pub type BoxSink =
Box<dyn Sink<Writer = BoxWriter<()>, Coordinator = BoxCoordinator> + Send + Sync + 'static>;
pub type BoxSink = Box<
dyn Sink<LogSinker = LogSinkerOf<BoxWriter<()>>, Coordinator = BoxCoordinator>
+ Send
+ Sync
+ 'static,
>;

impl Debug for BoxSink {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Expand Down Expand Up @@ -74,14 +79,17 @@ impl SinkCommitCoordinator for BoxCoordinator {
#[async_trait]
impl Sink for BoxSink {
type Coordinator = BoxCoordinator;
type Writer = BoxWriter<()>;
type LogSinker = LogSinkerOf<BoxWriter<()>>;

async fn validate(&self, client: Option<ConnectorClient>) -> crate::sink::Result<()> {
self.deref().validate(client).await
}

async fn new_writer(&self, writer_param: SinkWriterParam) -> crate::sink::Result<Self::Writer> {
self.deref().new_writer(writer_param).await
async fn new_log_sinker(
&self,
writer_param: SinkWriterParam,
) -> crate::sink::Result<Self::LogSinker> {
self.deref().new_log_sinker(writer_param).await
}

async fn new_coordinator(
Expand Down
10 changes: 6 additions & 4 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use serde_with::serde_as;

use super::{DummySinkCommitCoordinator, SinkWriterParam};
use crate::common::ClickHouseCommon;
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{
Result, Sink, SinkError, SinkWriter, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Result, Sink, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};

pub const CLICKHOUSE_SINK: &str = "clickhouse";
Expand Down Expand Up @@ -177,7 +178,7 @@ impl ClickHouseSink {
#[async_trait::async_trait]
impl Sink for ClickHouseSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = ClickHouseSinkWriter;
type LogSinker = LogSinkerOf<ClickHouseSinkWriter>;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
// For upsert clickhouse sink, the primary key must be defined.
Expand Down Expand Up @@ -206,14 +207,15 @@ impl Sink for ClickHouseSink {
Ok(())
}

async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result<Self::Writer> {
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(ClickHouseSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await?)
.await?
.into_log_sinker(writer_param.sink_metrics))
}
}
pub struct ClickHouseSinkWriter {
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::{CoordinatorStreamHandle, SinkCoordinationRpcClient};
use tracing::warn;

use crate::sink::{Result, SinkError, SinkParam, SinkWriter};
use crate::sink::writer::SinkWriter;
use crate::sink::{Result, SinkError, SinkParam};

pub struct CoordinatedSinkWriter<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
epoch: u64,
Expand Down
11 changes: 6 additions & 5 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use serde_derive::Deserialize;
use serde_json::Value;

use super::{
Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::deserialize_bool_from_string;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::remote::{CoordinatedRemoteSink, RemoteConfig};
use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};

/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg".
Expand Down Expand Up @@ -218,14 +218,14 @@ impl IcebergSink {
#[async_trait::async_trait]
impl Sink for IcebergSink {
type Coordinator = IcebergSinkCommitter;
type Writer = CoordinatedSinkWriter<IcebergWriter>;
type LogSinker = LogSinkerOf<CoordinatedSinkWriter<IcebergWriter>>;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
let _ = self.create_table().await?;
Ok(())
}

async fn new_writer(&self, writer_param: SinkWriterParam) -> Result<Self::Writer> {
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let table = self.create_table().await?;

let inner = IcebergWriter {
Expand All @@ -250,7 +250,8 @@ impl Sink for IcebergSink {
})?,
inner,
)
.await?)
.await?
.into_log_sinker(writer_param.sink_metrics))
}

async fn new_coordinator(
Expand Down
16 changes: 9 additions & 7 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ use super::formatter::{
AppendOnlyFormatter, DebeziumAdapterOpts, DebeziumJsonFormatter, UpsertFormatter,
};
use super::{
FormattedSink, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM,
SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
};
use crate::common::KafkaCommon;
use crate::sink::{
DummySinkCommitCoordinator, Result, SinkWriterParam, SinkWriterV1, SinkWriterV1Adapter,
use crate::sink::writer::{
FormattedSink, LogSinkerOf, SinkWriterExt, SinkWriterV1, SinkWriterV1Adapter,
};
use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext};
use crate::source::{SourceEnumeratorContext, SplitEnumerator};
use crate::{
Expand Down Expand Up @@ -281,9 +282,9 @@ impl KafkaSink {
#[async_trait::async_trait]
impl Sink for KafkaSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = SinkWriterV1Adapter<KafkaSinkWriter>;
type LogSinker = LogSinkerOf<SinkWriterV1Adapter<KafkaSinkWriter>>;

async fn new_writer(&self, writer_param: SinkWriterParam) -> Result<Self::Writer> {
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(SinkWriterV1Adapter::new(
KafkaSinkWriter::new(
self.config.clone(),
Expand All @@ -295,7 +296,8 @@ impl Sink for KafkaSink {
format!("sink-{:?}", writer_param.executor_id),
)
.await?,
))
)
.into_log_sinker(writer_param.sink_metrics))
}

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
Expand Down
16 changes: 9 additions & 7 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ use tokio_retry::Retry;
use super::formatter::{
AppendOnlyFormatter, DebeziumAdapterOpts, DebeziumJsonFormatter, UpsertFormatter,
};
use super::{FormattedSink, SinkParam};
use super::SinkParam;
use crate::common::KinesisCommon;
use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriter, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY,
SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};

pub const KINESIS_SINK: &str = "kinesis";
Expand Down Expand Up @@ -67,7 +68,7 @@ impl KinesisSink {
#[async_trait::async_trait]
impl Sink for KinesisSink {
type Coordinator = DummySinkCommitCoordinator;
type Writer = KinesisSinkWriter;
type LogSinker = LogSinkerOf<KinesisSinkWriter>;

async fn validate(&self, _client: Option<ConnectorClient>) -> Result<()> {
// For upsert Kafka sink, the primary key must be defined.
Expand All @@ -92,16 +93,17 @@ impl Sink for KinesisSink {
Ok(())
}

async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result<Self::Writer> {
KinesisSinkWriter::new(
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(KinesisSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
self.db_name.clone(),
self.sink_from_name.clone(),
)
.await
.await?
.into_log_sinker(writer_param.sink_metrics))
}
}

Expand Down
Loading
Loading