Skip to content

Commit

Permalink
use format_desc in kafka/kinesis/pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed Sep 29, 2023
1 parent fd80837 commit c219b4a
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 146 deletions.
9 changes: 8 additions & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ pub enum SinkEncode {

impl SinkFormatDesc {
pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
use crate::sink::kafka::KafkaSink;
use crate::sink::kinesis::KinesisSink;
use crate::sink::pulsar::PulsarSink;
use crate::sink::Sink as _;

let format = match r#type {
SINK_TYPE_APPEND_ONLY => SinkFormat::AppendOnly,
SINK_TYPE_UPSERT => SinkFormat::Upsert,
Expand All @@ -142,7 +147,9 @@ impl SinkFormatDesc {
}
};
let encode = match connector {
"kafka" => SinkEncode::Json,
KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
SinkEncode::Json
}
_ => return Ok(None),
};
Ok(Some(Self {
Expand Down
76 changes: 41 additions & 35 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use anyhow::anyhow;
use risingwave_common::array::StreamChunk;

use crate::sink::{Result, SinkError, SINK_TYPE_DEBEZIUM, SINK_TYPE_UPSERT};
use crate::sink::{Result, SinkError};

mod append_only;
mod debezium_json;
Expand All @@ -26,6 +26,7 @@ pub use debezium_json::{DebeziumAdapterOpts, DebeziumJsonFormatter};
use risingwave_common::catalog::Schema;
pub use upsert::UpsertFormatter;

use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};

/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
Expand Down Expand Up @@ -64,47 +65,52 @@ pub enum SinkFormatterImpl {

impl SinkFormatterImpl {
pub fn new(
formatter_type: &str,
format_desc: &SinkFormatDesc,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
db_name: String,
sink_from_name: String,
) -> Result<Self> {
if is_append_only {
let key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);
if format_desc.encode != SinkEncode::Json {
return Err(SinkError::Config(anyhow!(
"sink encode unsupported: {:?}",
format_desc.encode,
)));
}

let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::AppendOnlyJson(formatter))
} else if formatter_type == SINK_TYPE_DEBEZIUM {
Ok(SinkFormatterImpl::DebeziumJson(DebeziumJsonFormatter::new(
schema,
pk_indices,
db_name,
sink_from_name,
DebeziumAdapterOpts::default(),
)))
} else if formatter_type == SINK_TYPE_UPSERT {
let key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);
match format_desc.format {
SinkFormat::AppendOnly => {
let key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

// Initialize the upsert_stream
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertJson(formatter))
} else {
Err(SinkError::Config(anyhow!(
"unsupported upsert sink type {}",
formatter_type
)))
let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::AppendOnlyJson(formatter))
}
SinkFormat::Debezium => {
Ok(SinkFormatterImpl::DebeziumJson(DebeziumJsonFormatter::new(
schema,
pk_indices,
db_name,
sink_from_name,
DebeziumAdapterOpts::default(),
)))
}
SinkFormat::Upsert => {
let key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

// Initialize the upsert_stream
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertJson(formatter))
}
}
}
}
Expand Down
58 changes: 10 additions & 48 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};

use super::{
Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
SINK_TYPE_UPSERT,
};
use super::catalog::{SinkFormat, SinkFormatDesc};
use super::{Sink, SinkError, SinkParam};
use crate::common::KafkaCommon;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::writer::{
Expand Down Expand Up @@ -66,10 +64,6 @@ const fn _default_use_transaction() -> bool {
false
}

const fn _default_force_append_only() -> bool {
false
}

#[derive(Debug, Clone, PartialEq, Display, Serialize, Deserialize, EnumString)]
#[strum(serialize_all = "snake_case")]
enum CompressionCodec {
Expand Down Expand Up @@ -185,14 +179,6 @@ pub struct KafkaConfig {
#[serde(flatten)]
pub common: KafkaCommon,

pub r#type: String, // accept "append-only", "debezium", or "upsert"

#[serde(
default = "_default_force_append_only",
deserialize_with = "deserialize_bool_from_string"
)]
pub force_append_only: bool,

#[serde(
rename = "properties.timeout",
default = "_default_timeout",
Expand Down Expand Up @@ -234,18 +220,6 @@ impl KafkaConfig {
let config = serde_json::from_value::<KafkaConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;

if config.r#type != SINK_TYPE_APPEND_ONLY
&& config.r#type != SINK_TYPE_DEBEZIUM
&& config.r#type != SINK_TYPE_UPSERT
{
return Err(SinkError::Config(anyhow!(
"`{}` must be {}, {}, or {}",
SINK_TYPE_OPTION,
SINK_TYPE_APPEND_ONLY,
SINK_TYPE_DEBEZIUM,
SINK_TYPE_UPSERT
)));
}
Ok(config)
}

Expand Down Expand Up @@ -277,7 +251,7 @@ pub struct KafkaSink {
pub config: KafkaConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
format_desc: SinkFormatDesc,
db_name: String,
sink_from_name: String,
}
Expand All @@ -292,7 +266,9 @@ impl TryFrom<SinkParam> for KafkaSink {
config,
schema,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
format_desc: param
.format_desc
.ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
db_name: param.db_name,
sink_from_name: param.sink_from_name,
})
Expand All @@ -310,10 +286,9 @@ impl Sink for KafkaSink {
KafkaSinkWriter::new(
self.config.clone(),
SinkFormatterImpl::new(
&self.config.r#type,
&self.format_desc,
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
self.db_name.clone(),
self.sink_from_name.clone(),
)?,
Expand All @@ -325,10 +300,10 @@ impl Sink for KafkaSink {

async fn validate(&self) -> Result<()> {
// For upsert Kafka sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
if self.format_desc.format != SinkFormat::AppendOnly && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
"primary key not defined for {} kafka sink (please define in `primary_key` field)",
self.config.r#type
"primary key not defined for {:?} kafka sink (please define in `primary_key` field)",
self.format_desc.format
)));
}

Expand Down Expand Up @@ -661,8 +636,6 @@ mod test {
let config = KafkaConfig::from_hashmap(properties).unwrap();
assert_eq!(config.common.brokers, "localhost:9092");
assert_eq!(config.common.topic, "test");
assert_eq!(config.r#type, "append-only");
assert!(config.force_append_only);
assert!(!config.use_transaction);
assert_eq!(config.timeout, Duration::from_secs(10));
assert_eq!(config.max_retry_num, 20);
Expand All @@ -676,7 +649,6 @@ mod test {
"type".to_string() => "upsert".to_string(),
};
let config = KafkaConfig::from_hashmap(properties).unwrap();
assert!(!config.force_append_only);
assert!(!config.use_transaction);
assert_eq!(config.timeout, Duration::from_secs(5));
assert_eq!(config.max_retry_num, 3);
Expand All @@ -692,16 +664,6 @@ mod test {
};
assert!(KafkaConfig::from_hashmap(properties).is_err());

// Invalid bool input.
let properties: HashMap<String, String> = hashmap! {
"connector".to_string() => "kafka".to_string(),
"properties.bootstrap.server".to_string() => "localhost:9092".to_string(),
"topic".to_string() => "test".to_string(),
"type".to_string() => "upsert".to_string(),
"force_append_only".to_string() => "yes".to_string(), // error!
};
assert!(KafkaConfig::from_hashmap(properties).is_err());

// Invalid duration input.
let properties: HashMap<String, String> = hashmap! {
"connector".to_string() => "kafka".to_string(),
Expand Down
46 changes: 13 additions & 33 deletions src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ use serde_with::serde_as;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

use super::catalog::{SinkFormat, SinkFormatDesc};
use super::SinkParam;
use crate::common::KinesisCommon;
use crate::dispatch_sink_formatter_impl;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::writer::{FormattedSink, LogSinkerOf, SinkWriter, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY,
SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkWriterParam};

pub const KINESIS_SINK: &str = "kinesis";

Expand All @@ -43,7 +41,7 @@ pub struct KinesisSink {
pub config: KinesisSinkConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
format_desc: SinkFormatDesc,
db_name: String,
sink_from_name: String,
}
Expand All @@ -58,7 +56,9 @@ impl TryFrom<SinkParam> for KinesisSink {
config,
schema,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
format_desc: param
.format_desc
.ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
db_name: param.db_name,
sink_from_name: param.sink_from_name,
})
Expand All @@ -73,10 +73,10 @@ impl Sink for KinesisSink {

async fn validate(&self) -> Result<()> {
// For upsert Kafka sink, the primary key must be defined.
if !self.is_append_only && self.pk_indices.is_empty() {
if self.format_desc.format != SinkFormat::AppendOnly && self.pk_indices.is_empty() {
return Err(SinkError::Config(anyhow!(
"primary key not defined for {} kafka sink (please define in `primary_key` field)",
self.config.r#type
"primary key not defined for {:?} kafka sink (please define in `primary_key` field)",
self.format_desc.format
)));
}

Expand All @@ -99,7 +99,7 @@ impl Sink for KinesisSink {
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
&self.format_desc,
self.db_name.clone(),
self.sink_from_name.clone(),
)
Expand All @@ -113,27 +113,13 @@ impl Sink for KinesisSink {
pub struct KinesisSinkConfig {
#[serde(flatten)]
pub common: KinesisCommon,

pub r#type: String, // accept "append-only", "debezium", or "upsert"
}

impl KinesisSinkConfig {
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
let config =
serde_json::from_value::<KinesisSinkConfig>(serde_json::to_value(properties).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
if config.r#type != SINK_TYPE_APPEND_ONLY
&& config.r#type != SINK_TYPE_DEBEZIUM
&& config.r#type != SINK_TYPE_UPSERT
{
return Err(SinkError::Config(anyhow!(
"`{}` must be {}, {}, or {}",
SINK_TYPE_OPTION,
SINK_TYPE_APPEND_ONLY,
SINK_TYPE_DEBEZIUM,
SINK_TYPE_UPSERT
)));
}
Ok(config)
}
}
Expand All @@ -154,18 +140,12 @@ impl KinesisSinkWriter {
config: KinesisSinkConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
format_desc: &SinkFormatDesc,
db_name: String,
sink_from_name: String,
) -> Result<Self> {
let formatter = SinkFormatterImpl::new(
&config.r#type,
schema,
pk_indices,
is_append_only,
db_name,
sink_from_name,
)?;
let formatter =
SinkFormatterImpl::new(format_desc, schema, pk_indices, db_name, sink_from_name)?;
let client = config
.common
.build_client()
Expand Down
Loading

0 comments on commit c219b4a

Please sign in to comment.