Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): use 'create sink ... format ... encode' to create redis sink #13003

Merged
merged 3 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions integration_tests/redis-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@ FROM
bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
type = 'append-only',
force_append_only='true',
redis.url= 'redis://127.0.0.1:6379/',
);
)FORMAT PLAIN ENCODE JSON(force_append_only='true');

CREATE SINK bhv_redis_sink_2
FROM
bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
type = 'append-only',
force_append_only='true',
redis.url= 'redis://127.0.0.1:6379/',
redis.keyformat='user_id:{user_id}',
redis.valueformat='username:{username},event_timestamp{event_timestamp}'
);
)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}');
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ enum EncodeType {
ENCODE_TYPE_PROTOBUF = 4;
ENCODE_TYPE_JSON = 5;
ENCODE_TYPE_BYTES = 6;
ENCODE_TYPE_TEMPLATE = 7;
}

enum RowFormatType {
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub enum SinkEncode {
Json,
Protobuf,
Avro,
Template,
}

impl SinkFormatDesc {
Expand Down Expand Up @@ -177,6 +178,7 @@ impl SinkFormatDesc {
SinkEncode::Json => E::Json,
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
SinkEncode::Template => E::Template,
};
let options = self
.options
Expand Down Expand Up @@ -212,6 +214,7 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
let encode = match value.encode() {
E::Json => SinkEncode::Json,
E::Protobuf => SinkEncode::Protobuf,
E::Template => SinkEncode::Template,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => {
return Err(SinkError::Config(anyhow!(
Expand Down
22 changes: 22 additions & 0 deletions src/connector/src/sink/encoder/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use regex::Regex;
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::types::ToText;

use super::{Result, RowEncoder};
use crate::sink::SinkError;

/// Encode a row according to a specified string template `user_id:{user_id}`
pub struct TemplateEncoder {
Expand All @@ -34,6 +38,24 @@ impl TemplateEncoder {
template,
}
}

pub fn check_string_format(format: &str, set: &HashSet<String>) -> Result<()> {
// We will check if the string inside {} corresponds to a column name in rw.
// In other words, the content within {} should exclusively consist of column names from rw,
// which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect.
let re = Regex::new(r"\{([^}]*)\}").unwrap();
if !re.is_match(format) {
return Err(SinkError::Redis(
"Can't find {} in key_format or value_format".to_string(),
));
}
for capture in re.captures_iter(format) {
if let Some(inner_content) = capture.get(1) && !set.contains(inner_content.as_str()){
return Err(SinkError::Redis(format!("Can't find field({:?}) in key_format or value_format",inner_content.as_str())))
}
}
Ok(())
}
}

impl RowEncoder for TemplateEncoder {
Expand Down
156 changes: 80 additions & 76 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub use upsert::UpsertFormatter;
use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use super::encoder::template::TemplateEncoder;
use super::encoder::KafkaConnectParams;
use super::redis::{KEY_FORMAT, VALUE_FORMAT};
use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode};

/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
Expand Down Expand Up @@ -92,7 +93,7 @@ impl SinkFormatterImpl {
let key_encoder = (!pk_indices.is_empty()).then(|| {
JsonEncoder::new(
schema.clone(),
Some(pk_indices),
Some(pk_indices.clone()),
TimestampHandlingMode::Milli,
)
});
Expand All @@ -115,6 +116,28 @@ impl SinkFormatterImpl {
Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
}
SinkEncode::Avro => err_unsupported(),
SinkEncode::Template => {
let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
key_format.clone(),
);
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
Ok(SinkFormatterImpl::AppendOnlyTemplate(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
))
}
}
}
SinkFormat::Debezium => {
Expand All @@ -131,85 +154,66 @@ impl SinkFormatterImpl {
)))
}
SinkFormat::Upsert => {
if format_desc.encode != SinkEncode::Json {
return err_unsupported();
}
match format_desc.encode {
SinkEncode::Json => {
let mut key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let mut val_encoder =
JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

let mut key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let mut val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

if let Some(s) = format_desc.options.get("schemas.enable") {
match s.to_lowercase().parse::<bool>() {
Ok(true) => {
let kafka_connect = KafkaConnectParams {
schema_name: format!("{}.{}", db_name, sink_from_name),
};
key_encoder = key_encoder.with_kafka_connect(kafka_connect.clone());
val_encoder = val_encoder.with_kafka_connect(kafka_connect);
}
Ok(false) => {}
_ => {
return Err(SinkError::Config(anyhow!(
"schemas.enable is expected to be `true` or `false`, got {}",
s
)));
}
if let Some(s) = format_desc.options.get("schemas.enable") {
match s.to_lowercase().parse::<bool>() {
Ok(true) => {
let kafka_connect = KafkaConnectParams {
schema_name: format!("{}.{}", db_name, sink_from_name),
};
key_encoder =
key_encoder.with_kafka_connect(kafka_connect.clone());
val_encoder = val_encoder.with_kafka_connect(kafka_connect);
}
Ok(false) => {}
_ => {
return Err(SinkError::Config(anyhow!(
"schemas.enable is expected to be `true` or `false`, got {}",
s
)));
}
}
};

// Initialize the upsert_stream
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertJson(formatter))
}
};

// Initialize the upsert_stream
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertJson(formatter))
}
}
}

pub fn new_with_redis(
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
key_format: Option<String>,
value_format: Option<String>,
) -> Result<Self> {
match (key_format, value_format) {
(Some(k), Some(v)) => {
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
k,
);
let val_encoder =
TemplateEncoder::new(schema, None, v);
if is_append_only {
Ok(SinkFormatterImpl::AppendOnlyTemplate(AppendOnlyFormatter::new(Some(key_encoder), val_encoder)))
} else {
Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new(key_encoder, val_encoder)))
}
}
(None, None) => {
let key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let val_encoder = JsonEncoder::new(
schema,
None,
TimestampHandlingMode::Milli,
);
if is_append_only {
Ok(SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(Some(key_encoder), val_encoder)))
} else {
Ok(SinkFormatterImpl::UpsertJson(UpsertFormatter::new(key_encoder, val_encoder)))
SinkEncode::Template => {
let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
key_format.clone(),
);
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new(
key_encoder,
val_encoder,
)))
}
_ => err_unsupported(),
}
}
_ => {
Err(SinkError::Encode("Please provide template formats for both key and value, or choose the JSON format.".to_string()))
}
}
}
}
Expand Down
Loading
Loading