Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sink): unify log write path and distinguish table sink from external sink #14260

Merged
merged 12 commits into from
Jan 2, 2024
1 change: 1 addition & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/remote/types.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/sink_into_table.slt'
sleep 1

echo "--- testing remote sinks"
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/sink/sink_into_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ SET RW_IMPLICIT_FLUSH TO true;
statement ok
create table t_simple (v1 int, v2 int);

statement error unsupported sink type table
create sink table_sink from t_simple with (connector = 'table');

statement ok
create table m_simple (v1 int primary key, v2 int);

Expand Down
1 change: 0 additions & 1 deletion proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
string error_message = 1;
}

message SinkParam {

Check failure on line 22 in proto/connector_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "8" with name "target_table" on message "SinkParam" was deleted without reserving the number "8".
uint32 sink_id = 1;
map<string, string> properties = 2;
TableSchema table_schema = 3;
Expand All @@ -27,7 +27,6 @@
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
optional uint32 target_table = 8;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SinkParam is only used in rpc. So the proto breaking change can be ignored.

}

enum SinkPayloadFormat {
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ pub trait LogReader: Send + Sized + 'static {
) -> impl Future<Output = LogStoreResult<(bool, Option<Bitmap>)>> + Send + '_;
}

pub trait LogStoreFactory: 'static {
type Reader: LogReader + Send + 'static;
type Writer: LogWriter + Send + 'static;
pub trait LogStoreFactory: Send + 'static {
type Reader: LogReader;
type Writer: LogWriter;

fn build(self) -> impl Future<Output = (Self::Reader, Self::Writer)> + Send;
}
Expand Down
29 changes: 12 additions & 17 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

pub mod big_query;
pub mod blackhole;
pub mod boxed;
pub mod catalog;
pub mod clickhouse;
Expand All @@ -32,8 +31,8 @@ pub mod pulsar;
pub mod redis;
pub mod remote;
pub mod starrocks;
pub mod table;
pub mod test_sink;
pub mod trivial;
pub mod utils;
pub mod writer;

Expand All @@ -46,7 +45,7 @@ use ::redis::RedisError;
use anyhow::anyhow;
use async_trait::async_trait;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::error::{anyhow_error, ErrorCode, RwError};
use risingwave_common::metrics::{
LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge,
Expand All @@ -62,7 +61,6 @@ use self::catalog::{SinkFormatDesc, SinkType};
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::catalog::{SinkCatalog, SinkId};
use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
use crate::sink::table::TABLE_SINK;
use crate::sink::writer::SinkWriter;
use crate::ConnectorParams;

Expand All @@ -74,7 +72,7 @@ macro_rules! for_all_sinks {
{ Redis, $crate::sink::redis::RedisSink },
{ Kafka, $crate::sink::kafka::KafkaSink },
{ Pulsar, $crate::sink::pulsar::PulsarSink },
{ BlackHole, $crate::sink::blackhole::BlackHoleSink },
{ BlackHole, $crate::sink::trivial::BlackHoleSink },
{ Kinesis, $crate::sink::kinesis::KinesisSink },
{ ClickHouse, $crate::sink::clickhouse::ClickHouseSink },
{ Iceberg, $crate::sink::iceberg::IcebergSink },
Expand All @@ -88,7 +86,7 @@ macro_rules! for_all_sinks {
{ DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
{ BigQuery, $crate::sink::big_query::BigQuerySink },
{ Test, $crate::sink::test_sink::TestSink },
{ Table, $crate::sink::table::TableSink }
{ Table, $crate::sink::trivial::TableSink }
}
$(,$arg)*
}
Expand Down Expand Up @@ -149,7 +147,6 @@ pub struct SinkParam {
pub format_desc: Option<SinkFormatDesc>,
pub db_name: String,
pub sink_from_name: String,
pub target_table: Option<TableId>,
}

impl SinkParam {
Expand Down Expand Up @@ -181,7 +178,6 @@ impl SinkParam {
format_desc,
db_name: pb_param.db_name,
sink_from_name: pb_param.sink_from_name,
target_table: pb_param.target_table.map(TableId::new),
}
}

Expand All @@ -197,7 +193,6 @@ impl SinkParam {
format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
db_name: self.db_name.clone(),
sink_from_name: self.sink_from_name.clone(),
target_table: self.target_table.map(|table_id| table_id.table_id()),
}
}

Expand All @@ -223,7 +218,6 @@ impl From<SinkCatalog> for SinkParam {
format_desc: sink_catalog.format_desc,
db_name: sink_catalog.db_name,
sink_from_name: sink_catalog.sink_from_name,
target_table: sink_catalog.target_table,
}
}
}
Expand Down Expand Up @@ -371,13 +365,10 @@ impl SinkImpl {
param.properties.remove(PRIVATE_LINK_TARGET_KEY);
param.properties.remove(CONNECTION_NAME_KEY);

let sink_type = if param.target_table.is_some() {
TABLE_SINK
} else {
param.properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| {
SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY))
})?
};
let sink_type = param
.properties
.get(CONNECTOR_TYPE_KEY)
.ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;

match_sink_name_str!(
sink_type.to_lowercase().as_str(),
Expand All @@ -391,6 +382,10 @@ impl SinkImpl {
}
)
}

pub fn is_sink_into_table(&self) -> bool {
matches!(self, SinkImpl::Table(_))
}
}

pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
Expand Down
76 changes: 0 additions & 76 deletions src/connector/src/sink/table.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;

use async_trait::async_trait;

use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
Expand All @@ -21,26 +23,49 @@ use crate::sink::{
};

pub const BLACKHOLE_SINK: &str = "blackhole";
pub const TABLE_SINK: &str = "table";

pub trait TrivialSinkName: Send + 'static {
const SINK_NAME: &'static str;
}

#[derive(Debug)]
pub struct BlackHoleSink;
pub struct BlackHoleSinkName;

impl TrivialSinkName for BlackHoleSinkName {
const SINK_NAME: &'static str = BLACKHOLE_SINK;
}

impl TryFrom<SinkParam> for BlackHoleSink {
pub type BlackHoleSink = TrivialSink<BlackHoleSinkName>;

#[derive(Debug)]
pub struct TableSinkName;

impl TrivialSinkName for TableSinkName {
const SINK_NAME: &'static str = TABLE_SINK;
}

pub type TableSink = TrivialSink<TableSinkName>;

#[derive(Debug)]
pub struct TrivialSink<T: TrivialSinkName>(PhantomData<T>);

impl<T: TrivialSinkName> TryFrom<SinkParam> for TrivialSink<T> {
type Error = SinkError;

fn try_from(_value: SinkParam) -> std::result::Result<Self, Self::Error> {
Ok(Self)
Ok(Self(PhantomData))
}
}

impl Sink for BlackHoleSink {
impl<T: TrivialSinkName> Sink for TrivialSink<T> {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = Self;

const SINK_NAME: &'static str = BLACKHOLE_SINK;
const SINK_NAME: &'static str = T::SINK_NAME;

async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(Self)
Ok(Self(PhantomData))
}

async fn validate(&self) -> Result<()> {
Expand All @@ -49,7 +74,7 @@ impl Sink for BlackHoleSink {
}

#[async_trait]
impl LogSinker for BlackHoleSink {
impl<T: TrivialSinkName> LogSinker for TrivialSink<T> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<()> {
loop {
let (epoch, item) = log_reader.next_item().await?;
Expand Down
28 changes: 19 additions & 9 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::match_sink_name_str;
use risingwave_connector::sink::catalog::desc::SinkDesc;
use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType};
use risingwave_connector::sink::trivial::TABLE_SINK;
use risingwave_connector::sink::{
SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION,
Expand Down Expand Up @@ -105,17 +106,26 @@ impl StreamSink {
format_desc,
)?;

let unsupported_sink =
|sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)));

// check and ensure that the sink connector is specified and supported
match sink.properties.get(CONNECTOR_TYPE_KEY) {
Some(connector) => match_sink_name_str!(
connector.to_lowercase().as_str(),
SinkType,
Ok(()),
|other| Err(SinkError::Config(anyhow!(
"unsupported sink type {}",
other
)))
)?,
Some(connector) => {
match_sink_name_str!(
connector.to_lowercase().as_str(),
SinkType,
{
// the table sink is created by with properties
if connector == TABLE_SINK && sink.target_table.is_none() {
unsupported_sink(TABLE_SINK)
} else {
Ok(())
}
},
|other: &str| unsupported_sink(other)
)?;
}
None => {
return Err(
SinkError::Config(anyhow!("connector not specified when create sink")).into(),
Expand Down
7 changes: 3 additions & 4 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ use crate::hummock::metrics_utils::{
trigger_split_stat, trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats,
};
use crate::hummock::{CompactorManagerRef, TASK_NORMAL};
use crate::manager::{
ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, MetadataManager, TableId,
META_NODE_ID,
};
#[cfg(any(test, feature = "test"))]
use crate::manager::{ClusterManagerRef, FragmentManagerRef};
use crate::manager::{IdCategory, MetaSrvEnv, MetadataManager, TableId, META_NODE_ID};
use crate::model::{
BTreeMapEntryTransaction, BTreeMapTransaction, ClusterId, MetadataModel, ValTransaction,
VarTransaction,
Expand Down
Loading
Loading