Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Oct 23, 2023
1 parent c8d0223 commit de0df1e
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 129 deletions.
2 changes: 1 addition & 1 deletion integration_tests/redis-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ FROM
primary_key = 'user_id',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', redis_key_format = 'UserID:{user_id}', redis_value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}');
)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}');
8 changes: 3 additions & 5 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ impl SinkFormatDesc {
use crate::sink::kafka::KafkaSink;
use crate::sink::kinesis::KinesisSink;
use crate::sink::pulsar::PulsarSink;
use crate::sink::redis::RedisSink;
use crate::sink::Sink as _;

let format = match r#type {
Expand All @@ -155,10 +154,9 @@ impl SinkFormatDesc {
}
};
let encode = match connector {
KafkaSink::SINK_NAME
| KinesisSink::SINK_NAME
| PulsarSink::SINK_NAME
| RedisSink::SINK_NAME => SinkEncode::Json,
KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
SinkEncode::Json
}
_ => return Ok(None),
};
Ok(Some(Self {
Expand Down
174 changes: 80 additions & 94 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use upsert::UpsertFormatter;
use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use super::encoder::template::TemplateEncoder;
use super::encoder::KafkaConnectParams;
use super::redis::{REDIS_KEY_FORMAT, REDIS_VALUE_FORMAT};
use super::redis::{KEY_FORMAT, VALUE_FORMAT};
use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode};

/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
Expand Down Expand Up @@ -93,7 +93,7 @@ impl SinkFormatterImpl {
let key_encoder = (!pk_indices.is_empty()).then(|| {
JsonEncoder::new(
schema.clone(),
Some(pk_indices),
Some(pk_indices.clone()),
TimestampHandlingMode::Milli,
)
});
Expand All @@ -116,9 +116,28 @@ impl SinkFormatterImpl {
Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
}
SinkEncode::Avro => err_unsupported(),
SinkEncode::Template => Err(SinkError::Config(anyhow!(
"Template only support with redis sink"
))),
SinkEncode::Template => {
let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
key_format.clone(),
);
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
Ok(SinkFormatterImpl::AppendOnlyTemplate(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
))
}
}
}
SinkFormat::Debezium => {
Expand All @@ -135,99 +154,66 @@ impl SinkFormatterImpl {
)))
}
SinkFormat::Upsert => {
if format_desc.encode != SinkEncode::Json {
return err_unsupported();
}
match format_desc.encode {
SinkEncode::Json => {
let mut key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let mut val_encoder =
JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

let mut key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let mut val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

if let Some(s) = format_desc.options.get("schemas.enable") {
match s.to_lowercase().parse::<bool>() {
Ok(true) => {
let kafka_connect = KafkaConnectParams {
schema_name: format!("{}.{}", db_name, sink_from_name),
};
key_encoder = key_encoder.with_kafka_connect(kafka_connect.clone());
val_encoder = val_encoder.with_kafka_connect(kafka_connect);
}
Ok(false) => {}
_ => {
return Err(SinkError::Config(anyhow!(
"schemas.enable is expected to be `true` or `false`, got {}",
s
)));
}
if let Some(s) = format_desc.options.get("schemas.enable") {
match s.to_lowercase().parse::<bool>() {
Ok(true) => {
let kafka_connect = KafkaConnectParams {
schema_name: format!("{}.{}", db_name, sink_from_name),
};
key_encoder =
key_encoder.with_kafka_connect(kafka_connect.clone());
val_encoder = val_encoder.with_kafka_connect(kafka_connect);
}
Ok(false) => {}
_ => {
return Err(SinkError::Config(anyhow!(
"schemas.enable is expected to be `true` or `false`, got {}",
s
)));
}
}
};

// Initialize the upsert_stream
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertJson(formatter))
}
};

// Initialize the upsert_stream
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertJson(formatter))
}
}
}

pub fn new_with_redis(
schema: Schema,
pk_indices: Vec<usize>,
format_desc: &SinkFormatDesc,
) -> Result<Self> {
match format_desc.encode {
SinkEncode::Json => {
let key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::String,
);
let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::String);
match format_desc.format {
SinkFormat::AppendOnly => Ok(SinkFormatterImpl::AppendOnlyJson(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
)),
SinkFormat::Upsert => Ok(SinkFormatterImpl::UpsertJson(UpsertFormatter::new(
key_encoder,
val_encoder,
))),
_ => Err(SinkError::Config(anyhow!(
"Redis sink only support Append_Only and Upsert"
))),
}
}
SinkEncode::Template => {
let key_format = format_desc.options.get(REDIS_KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(REDIS_VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder =
TemplateEncoder::new(schema.clone(), Some(pk_indices), key_format.clone());
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
match format_desc.format {
SinkFormat::AppendOnly => Ok(SinkFormatterImpl::AppendOnlyTemplate(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
)),
SinkFormat::Upsert => Ok(SinkFormatterImpl::UpsertTemplate(
UpsertFormatter::new(key_encoder, val_encoder),
)),
_ => Err(SinkError::Config(anyhow!(
"Redis sink only support Append_Only and Upsert"
))),
SinkEncode::Template => {
let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
key_format.clone(),
);
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new(
key_encoder,
val_encoder,
)))
}
_ => err_unsupported(),
}
}
_ => Err(SinkError::Config(anyhow!(
"Redis sink only support Json and Template"
))),
}
}
}
Expand Down
74 changes: 45 additions & 29 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};

pub const REDIS_SINK: &str = "redis";
pub const REDIS_KEY_FORMAT: &str = "redis_key_format";
pub const REDIS_VALUE_FORMAT: &str = "redis_value_format";
pub const KEY_FORMAT: &str = "key_format";
pub const VALUE_FORMAT: &str = "value_format";
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct RedisCommon {
#[serde(rename = "redis.url")]
Expand Down Expand Up @@ -69,6 +69,8 @@ pub struct RedisSink {
schema: Schema,
pk_indices: Vec<usize>,
format_desc: SinkFormatDesc,
db_name: String,
sink_from_name: String,
}

fn check_string_format(format: &str, set: &HashSet<String>) -> Result<()> {
Expand Down Expand Up @@ -107,6 +109,8 @@ impl TryFrom<SinkParam> for RedisSink {
format_desc: param
.format_desc
.ok_or_else(|| SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")))?,
db_name: param.db_name,
sink_from_name: param.sink_from_name,
})
}
}
Expand All @@ -123,6 +127,8 @@ impl Sink for RedisSink {
self.schema.clone(),
self.pk_indices.clone(),
&self.format_desc,
self.db_name.clone(),
self.sink_from_name.clone(),
)
.await?
.into_log_sinker(writer_param.sink_metrics))
Expand All @@ -149,24 +155,16 @@ impl Sink for RedisSink {
self.format_desc.encode,
super::catalog::SinkEncode::Template
) {
let key_format = self
.format_desc
.options
.get(REDIS_KEY_FORMAT)
.ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_key_format',please set it or use JSON"
))
})?;
let value_format = self
.format_desc
.options
.get(REDIS_VALUE_FORMAT)
.ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_format = self.format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'key_format',please set it or use JSON"
))
})?;
let value_format = self.format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'value_format',please set it or use JSON"
))
})?;
check_string_format(key_format, &pk_set)?;
check_string_format(value_format, &all_set)?;
}
Expand Down Expand Up @@ -231,10 +229,18 @@ impl RedisSinkWriter {
schema: Schema,
pk_indices: Vec<usize>,
format_desc: &SinkFormatDesc,
db_name: String,
sink_from_name: String,
) -> Result<Self> {
let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?;
let formatter =
SinkFormatterImpl::new_with_redis(schema.clone(), pk_indices.clone(), format_desc)?;
let formatter = SinkFormatterImpl::new(
format_desc,
schema.clone(),
pk_indices.clone(),
db_name,
sink_from_name,
)
.await?;

Ok(Self {
schema,
Expand All @@ -246,13 +252,19 @@ impl RedisSinkWriter {
}

#[cfg(test)]
pub fn mock(
pub async fn mock(
schema: Schema,
pk_indices: Vec<usize>,
format_desc: &SinkFormatDesc,
) -> Result<Self> {
let formatter =
SinkFormatterImpl::new_with_redis(schema.clone(), pk_indices.clone(), format_desc)?;
let formatter = SinkFormatterImpl::new(
format_desc,
schema.clone(),
pk_indices.clone(),
"d1".to_string(),
"t1".to_string(),
)
.await?;
Ok(Self {
schema,
pk_indices,
Expand Down Expand Up @@ -320,7 +332,9 @@ mod test {
options: BTreeMap::default(),
};

let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc).unwrap();
let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
.await
.unwrap();

let chunk_a = StreamChunk::new(
vec![Op::Insert, Op::Insert, Op::Insert],
Expand Down Expand Up @@ -372,9 +386,9 @@ mod test {
]);

let mut btree_map = BTreeMap::default();
btree_map.insert(REDIS_KEY_FORMAT.to_string(), "key-{id}".to_string());
btree_map.insert(KEY_FORMAT.to_string(), "key-{id}".to_string());
btree_map.insert(
REDIS_VALUE_FORMAT.to_string(),
VALUE_FORMAT.to_string(),
"values:{id:{id},name:{name}}".to_string(),
);
let format_desc = SinkFormatDesc {
Expand All @@ -383,7 +397,9 @@ mod test {
options: btree_map,
};

let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc).unwrap();
let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
.await
.unwrap();

let chunk_a = StreamChunk::new(
vec![Op::Insert, Op::Insert, Op::Insert],
Expand Down

0 comments on commit de0df1e

Please sign in to comment.