Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
xxhZs committed Oct 24, 2023
1 parent c8d0223 commit 712042e
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 150 deletions.
2 changes: 1 addition & 1 deletion integration_tests/redis-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ FROM
primary_key = 'user_id',
connector = 'redis',
redis.url= 'redis://127.0.0.1:6379/',
)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', redis_key_format = 'UserID:{user_id}', redis_value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}');
)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}');
8 changes: 3 additions & 5 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ impl SinkFormatDesc {
use crate::sink::kafka::KafkaSink;
use crate::sink::kinesis::KinesisSink;
use crate::sink::pulsar::PulsarSink;
use crate::sink::redis::RedisSink;
use crate::sink::Sink as _;

let format = match r#type {
Expand All @@ -155,10 +154,9 @@ impl SinkFormatDesc {
}
};
let encode = match connector {
KafkaSink::SINK_NAME
| KinesisSink::SINK_NAME
| PulsarSink::SINK_NAME
| RedisSink::SINK_NAME => SinkEncode::Json,
KafkaSink::SINK_NAME | KinesisSink::SINK_NAME | PulsarSink::SINK_NAME => {
SinkEncode::Json
}
_ => return Ok(None),
};
Ok(Some(Self {
Expand Down
22 changes: 22 additions & 0 deletions src/connector/src/sink/encoder/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use regex::Regex;
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::types::ToText;

use super::{Result, RowEncoder};
use crate::sink::SinkError;

/// Encode a row according to a specified string template `user_id:{user_id}`
pub struct TemplateEncoder {
Expand All @@ -34,6 +38,24 @@ impl TemplateEncoder {
template,
}
}

pub fn check_string_format(format: &str, set: &HashSet<String>) -> Result<()> {
// We will check if the string inside {} corresponds to a column name in rw.
// In other words, the content within {} should exclusively consist of column names from rw,
// which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect.
let re = Regex::new(r"\{([^}]*)\}").unwrap();
if !re.is_match(format) {
return Err(SinkError::Redis(
"Can't find {} in key_format or value_format".to_string(),
));
}
for capture in re.captures_iter(format) {
if let Some(inner_content) = capture.get(1) && !set.contains(inner_content.as_str()){
return Err(SinkError::Redis(format!("Can't find field({:?}) in key_format or value_format",inner_content.as_str())))
}
}
Ok(())
}
}

impl RowEncoder for TemplateEncoder {
Expand Down
174 changes: 80 additions & 94 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use upsert::UpsertFormatter;
use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use super::encoder::template::TemplateEncoder;
use super::encoder::KafkaConnectParams;
use super::redis::{REDIS_KEY_FORMAT, REDIS_VALUE_FORMAT};
use super::redis::{KEY_FORMAT, VALUE_FORMAT};
use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode};

/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
Expand Down Expand Up @@ -93,7 +93,7 @@ impl SinkFormatterImpl {
let key_encoder = (!pk_indices.is_empty()).then(|| {
JsonEncoder::new(
schema.clone(),
Some(pk_indices),
Some(pk_indices.clone()),
TimestampHandlingMode::Milli,
)
});
Expand All @@ -116,9 +116,28 @@ impl SinkFormatterImpl {
Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
}
SinkEncode::Avro => err_unsupported(),
SinkEncode::Template => Err(SinkError::Config(anyhow!(
"Template only support with redis sink"
))),
SinkEncode::Template => {
let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
key_format.clone(),
);
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
Ok(SinkFormatterImpl::AppendOnlyTemplate(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
))
}
}
}
SinkFormat::Debezium => {
Expand All @@ -135,99 +154,66 @@ impl SinkFormatterImpl {
)))
}
SinkFormat::Upsert => {
if format_desc.encode != SinkEncode::Json {
return err_unsupported();
}
match format_desc.encode {
SinkEncode::Json => {
let mut key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let mut val_encoder =
JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

let mut key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let mut val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

if let Some(s) = format_desc.options.get("schemas.enable") {
match s.to_lowercase().parse::<bool>() {
Ok(true) => {
let kafka_connect = KafkaConnectParams {
schema_name: format!("{}.{}", db_name, sink_from_name),
};
key_encoder = key_encoder.with_kafka_connect(kafka_connect.clone());
val_encoder = val_encoder.with_kafka_connect(kafka_connect);
}
Ok(false) => {}
_ => {
return Err(SinkError::Config(anyhow!(
"schemas.enable is expected to be `true` or `false`, got {}",
s
)));
}
if let Some(s) = format_desc.options.get("schemas.enable") {
match s.to_lowercase().parse::<bool>() {
Ok(true) => {
let kafka_connect = KafkaConnectParams {
schema_name: format!("{}.{}", db_name, sink_from_name),
};
key_encoder =
key_encoder.with_kafka_connect(kafka_connect.clone());
val_encoder = val_encoder.with_kafka_connect(kafka_connect);
}
Ok(false) => {}
_ => {
return Err(SinkError::Config(anyhow!(
"schemas.enable is expected to be `true` or `false`, got {}",
s
)));
}
}
};

// Initialize the upsert_stream
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertJson(formatter))
}
};

// Initialize the upsert_stream
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertJson(formatter))
}
}
}

pub fn new_with_redis(
schema: Schema,
pk_indices: Vec<usize>,
format_desc: &SinkFormatDesc,
) -> Result<Self> {
match format_desc.encode {
SinkEncode::Json => {
let key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::String,
);
let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::String);
match format_desc.format {
SinkFormat::AppendOnly => Ok(SinkFormatterImpl::AppendOnlyJson(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
)),
SinkFormat::Upsert => Ok(SinkFormatterImpl::UpsertJson(UpsertFormatter::new(
key_encoder,
val_encoder,
))),
_ => Err(SinkError::Config(anyhow!(
"Redis sink only support Append_Only and Upsert"
))),
}
}
SinkEncode::Template => {
let key_format = format_desc.options.get(REDIS_KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(REDIS_VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder =
TemplateEncoder::new(schema.clone(), Some(pk_indices), key_format.clone());
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
match format_desc.format {
SinkFormat::AppendOnly => Ok(SinkFormatterImpl::AppendOnlyTemplate(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
)),
SinkFormat::Upsert => Ok(SinkFormatterImpl::UpsertTemplate(
UpsertFormatter::new(key_encoder, val_encoder),
)),
_ => Err(SinkError::Config(anyhow!(
"Redis sink only support Append_Only and Upsert"
))),
SinkEncode::Template => {
let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
key_format.clone(),
);
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new(
key_encoder,
val_encoder,
)))
}
_ => err_unsupported(),
}
}
_ => Err(SinkError::Config(anyhow!(
"Redis sink only support Json and Template"
))),
}
}
}
Expand Down
Loading

0 comments on commit 712042e

Please sign in to comment.