Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): use 'create sink ... format ... encode' to create redis sink #13003

Merged
merged 3 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions integration_tests/redis-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@ FROM
bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
type = 'append-only',
force_append_only='true',
redis.url= 'redis://127.0.0.1:6379/',
);
)FORMAT PLAIN ENCODE JSON(force_append_only='true');

CREATE SINK bhv_redis_sink_2
FROM
bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
type = 'append-only',
force_append_only='true',
redis.url= 'redis://127.0.0.1:6379/',
redis.keyformat='user_id:{user_id}',
redis.valueformat='username:{username},event_timestamp{event_timestamp}'
);
)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', redis_key_format = 'UserID:{user_id}', redis_value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}');
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ enum EncodeType {
ENCODE_TYPE_PROTOBUF = 4;
ENCODE_TYPE_JSON = 5;
ENCODE_TYPE_BYTES = 6;
ENCODE_TYPE_TEMPLATE = 7;
}

enum RowFormatType {
Expand Down
11 changes: 8 additions & 3 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,15 @@ pub enum SinkEncode {
Json,
Protobuf,
Avro,
Template,
}

impl SinkFormatDesc {
pub fn from_legacy_type(connector: &str, r#type: &str) -> Result<Option<Self>, SinkError> {
use crate::sink::kafka::KafkaSink;
use crate::sink::kinesis::KinesisSink;
use crate::sink::pulsar::PulsarSink;
use crate::sink::redis::RedisSink;
use crate::sink::Sink as _;

let format = match r#type {
Expand All @@ -153,9 +155,10 @@ impl SinkFormatDesc {
}
};
let encode = match connector {
KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
SinkEncode::Json
}
KafkaSink::SINK_NAME
| KinesisSink::SINK_NAME
| PulsarSink::SINK_NAME
| RedisSink::SINK_NAME => SinkEncode::Json,
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
_ => return Ok(None),
};
Ok(Some(Self {
Expand All @@ -177,6 +180,7 @@ impl SinkFormatDesc {
SinkEncode::Json => E::Json,
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
SinkEncode::Template => E::Template,
};
let options = self
.options
Expand Down Expand Up @@ -212,6 +216,7 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
let encode = match value.encode() {
E::Json => SinkEncode::Json,
E::Protobuf => SinkEncode::Protobuf,
E::Template => SinkEncode::Template,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => {
return Err(SinkError::Config(anyhow!(
Expand Down
80 changes: 49 additions & 31 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub use upsert::UpsertFormatter;
use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use super::encoder::template::TemplateEncoder;
use super::encoder::KafkaConnectParams;
use super::redis::{REDIS_KEY_FORMAT, REDIS_VALUE_FORMAT};
use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode};

/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
Expand Down Expand Up @@ -115,6 +116,9 @@ impl SinkFormatterImpl {
Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
}
SinkEncode::Avro => err_unsupported(),
SinkEncode::Template => Err(SinkError::Config(anyhow!(
"Template only support with redis sink"
))),
}
}
SinkFormat::Debezium => {
Expand Down Expand Up @@ -171,45 +175,59 @@ impl SinkFormatterImpl {
pub fn new_with_redis(
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
key_format: Option<String>,
value_format: Option<String>,
format_desc: &SinkFormatDesc,
) -> Result<Self> {
match (key_format, value_format) {
(Some(k), Some(v)) => {
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
k,
);
let val_encoder =
TemplateEncoder::new(schema, None, v);
if is_append_only {
Ok(SinkFormatterImpl::AppendOnlyTemplate(AppendOnlyFormatter::new(Some(key_encoder), val_encoder)))
} else {
Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new(key_encoder, val_encoder)))
}
}
(None, None) => {
match format_desc.encode {
SinkEncode::Json => {
let key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
TimestampHandlingMode::String,
);
let val_encoder = JsonEncoder::new(
schema,
None,
TimestampHandlingMode::Milli,
);
if is_append_only {
Ok(SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(Some(key_encoder), val_encoder)))
} else {
Ok(SinkFormatterImpl::UpsertJson(UpsertFormatter::new(key_encoder, val_encoder)))
let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::String);
match format_desc.format {
SinkFormat::AppendOnly => Ok(SinkFormatterImpl::AppendOnlyJson(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
)),
SinkFormat::Upsert => Ok(SinkFormatterImpl::UpsertJson(UpsertFormatter::new(
key_encoder,
val_encoder,
))),
_ => Err(SinkError::Config(anyhow!(
"Redis sink only support Append_Only and Upsert"
))),
}
}
_ => {
Err(SinkError::Encode("Please provide template formats for both key and value, or choose the JSON format.".to_string()))
SinkEncode::Template => {
let key_format = format_desc.options.get(REDIS_KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(REDIS_VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder =
TemplateEncoder::new(schema.clone(), Some(pk_indices), key_format.clone());
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
match format_desc.format {
SinkFormat::AppendOnly => Ok(SinkFormatterImpl::AppendOnlyTemplate(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
)),
SinkFormat::Upsert => Ok(SinkFormatterImpl::UpsertTemplate(
UpsertFormatter::new(key_encoder, val_encoder),
)),
_ => Err(SinkError::Config(anyhow!(
"Redis sink only support Append_Only and Upsert"
))),
}
}
_ => Err(SinkError::Config(anyhow!(
"Redis sink only support Json and Template"
))),
}
}
}
Expand Down
Loading
Loading