Skip to content

Commit

Permalink
feat(log-store): refine log store trait and add update vnode bitmap i…
Browse files Browse the repository at this point in the history
…n reader (#11959)
  • Loading branch information
wenym1 authored Sep 6, 2023
1 parent e64a075 commit da89875
Show file tree
Hide file tree
Showing 18 changed files with 371 additions and 480 deletions.
3 changes: 2 additions & 1 deletion src/connector/src/sink/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::fmt::{Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use async_trait::async_trait;
use risingwave_common::array::StreamChunk;
Expand Down Expand Up @@ -54,7 +55,7 @@ impl<CM: 'static + Send> SinkWriter for BoxWriter<CM> {
self.deref_mut().abort().await
}

async fn update_vnode_bitmap(&mut self, vnode_bitmap: Bitmap) -> crate::sink::Result<()> {
async fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) -> crate::sink::Result<()> {
self.deref_mut().update_vnode_bitmap(vnode_bitmap).await
}
}
Expand Down
9 changes: 0 additions & 9 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use anyhow::anyhow;
use clickhouse::{Client, Row as ClickHouseRow};
use itertools::Itertools;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ScalarRefImpl, Serial};
Expand Down Expand Up @@ -439,14 +438,6 @@ impl SinkWriter for ClickHouseSinkWriter {
async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
Ok(())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> {
Ok(())
}
}

#[derive(ClickHouseRow, Deserialize)]
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use anyhow::anyhow;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
Expand Down Expand Up @@ -81,7 +83,7 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> SinkWriter for Coordi
self.inner.abort().await
}

async fn update_vnode_bitmap(&mut self, vnode_bitmap: Bitmap) -> Result<()> {
async fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) -> Result<()> {
self.inner.update_vnode_bitmap(vnode_bitmap).await
}
}
6 changes: 0 additions & 6 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,6 @@ impl SinkWriter for IcebergWriter {
// TODO: abort should clean up all the data written in this epoch.
Ok(())
}

/// Update the vnode bitmap of current sink writer
async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> {
// Just skip it.
Ok(())
}
}

pub struct IcebergSinkCommitter {
Expand Down
9 changes: 0 additions & 9 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use aws_sdk_kinesis::primitives::Blob;
use aws_sdk_kinesis::Client as KinesisClient;
use futures_async_stream::for_await;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::Deserialize;
Expand Down Expand Up @@ -272,14 +271,6 @@ impl SinkWriter for KinesisSinkWriter {
async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
Ok(())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> {
Ok(())
}
}

#[macro_export]
Expand Down
21 changes: 7 additions & 14 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod test_sink;
pub mod utils;

use std::collections::HashMap;
use std::sync::Arc;

use ::clickhouse::error::Error as ClickHouseError;
use anyhow::anyhow;
Expand Down Expand Up @@ -173,10 +174,14 @@ pub trait SinkWriter: Send + 'static {
async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata>;

/// Clean up
async fn abort(&mut self) -> Result<()>;
async fn abort(&mut self) -> Result<()> {
Ok(())
}

/// Update the vnode bitmap of current sink writer
async fn update_vnode_bitmap(&mut self, vnode_bitmap: Bitmap) -> Result<()>;
async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc<Bitmap>) -> Result<()> {
Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -240,10 +245,6 @@ impl<W: SinkWriterV1> SinkWriter for SinkWriterV1Adapter<W> {
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> {
Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -314,17 +315,9 @@ impl SinkWriter for BlackHoleSink {
Ok(())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
Ok(())
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> {
Ok(())
}
}

impl SinkConfig {
Expand Down
9 changes: 0 additions & 9 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::collections::HashMap;
use anyhow::anyhow;
use async_nats::jetstream::context::Context;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::error::anyhow_error;
use risingwave_rpc_client::ConnectorClient;
Expand Down Expand Up @@ -155,12 +154,4 @@ impl SinkWriter for NatsSinkWriter {
async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
Ok(())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> {
Ok(())
}
}
9 changes: 0 additions & 9 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use async_trait::async_trait;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_rpc_client::ConnectorClient;

Expand Down Expand Up @@ -58,15 +57,7 @@ impl SinkWriter for RedisSinkWriter {
todo!()
}

async fn abort(&mut self) -> Result<()> {
todo!()
}

async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> {
todo!()
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> {
todo!()
}
}
10 changes: 0 additions & 10 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use async_trait::async_trait;
use itertools::Itertools;
use prost::Message;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::error::anyhow_error;
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -409,15 +408,6 @@ where
Ok(<Self as HandleBarrierResponse>::non_checkpoint_return_value())
}
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Bitmap) -> Result<()> {
// TODO: handle scaling
Ok(())
}
}

pub struct RemoteCoordinator {
Expand Down
Loading

0 comments on commit da89875

Please sign in to comment.