Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into bump-toolchain-2023-1…
Browse files Browse the repository at this point in the history
…0-21
  • Loading branch information
TennyZhuang committed Oct 24, 2023
2 parents 1f371d7 + 210ae71 commit 703052c
Show file tree
Hide file tree
Showing 85 changed files with 2,521 additions and 2,173 deletions.
8 changes: 1 addition & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,4 @@ etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev =
# Patch for coverage_attribute.
# https://github.com/sgodwincs/dlv-list-rs/pull/19#issuecomment-1774786289
dlv-list = { git = "https://github.com/sgodwincs/dlv-list-rs.git", rev = "5bbc5d0" }
ordered-multimap = { git = "https://github.com/risingwavelabs/ordered-multimap-rs.git", dev = "c315112" }
ordered-multimap = { git = "https://github.com/risingwavelabs/ordered-multimap-rs.git", rev = "19c743f" }
1 change: 1 addition & 0 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ chmod +x ./risingwave_simulation

export RUST_LOG="info,\
risingwave_meta::barrier::recovery=debug,\
risingwave_meta::manager::catalog=debug,\
risingwave_meta::rpc::ddl_controller=debug,\
risingwave_meta::barrier::mod=debug,\
risingwave_simulation=debug"
Expand Down
10 changes: 5 additions & 5 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
version: "3"
services:
compactor-0:
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.2.0}"
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.3.0}"
command:
- compactor-node
- "--listen-addr"
Expand Down Expand Up @@ -37,7 +37,7 @@ services:
timeout: 5s
retries: 5
compute-node-0:
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.2.0}"
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.3.0}"
command:
- compute-node
- "--listen-addr"
Expand Down Expand Up @@ -122,7 +122,7 @@ services:
timeout: 5s
retries: 5
frontend-node-0:
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.2.0}"
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.3.0}"
command:
- frontend-node
- "--listen-addr"
Expand Down Expand Up @@ -179,7 +179,7 @@ services:
timeout: 5s
retries: 5
meta-node-0:
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.2.0}"
image: "ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.3.0}"
command:
- meta-node
- "--listen-addr"
Expand Down Expand Up @@ -295,7 +295,7 @@ services:
timeout: 5s
retries: 5
connector-node:
image: ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.2.0}
image: ghcr.io/risingwavelabs/risingwave:${RW_IMAGE_VERSION:-v1.3.0}
entrypoint: "/risingwave/bin/connector-node/start-service.sh"
ports:
- 50051
Expand Down
10 changes: 2 additions & 8 deletions integration_tests/redis-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@ FROM
bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
type = 'append-only',
force_append_only='true',
redis.url= 'redis://127.0.0.1:6379/',
);
)FORMAT PLAIN ENCODE JSON(force_append_only='true');

CREATE SINK bhv_redis_sink_2
FROM
bhv_mv WITH (
primary_key = 'user_id',
connector = 'redis',
type = 'append-only',
force_append_only='true',
redis.url= 'redis://127.0.0.1:6379/',
redis.keyformat='user_id:{user_id}',
redis.valueformat='username:{username},event_timestamp{event_timestamp}'
);
)FORMAT PLAIN ENCODE TEMPLATE(force_append_only='true', key_format = 'UserID:{user_id}', value_format = 'TargetID:{target_id},EventTimestamp{event_timestamp}');
1 change: 1 addition & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ message AggCall {
MODE = 24;
LAST_VALUE = 25;
GROUPING = 26;
INTERNAL_LAST_SEEN_VALUE = 27;
}
Type type = 1;
repeated InputRef args = 2;
Expand Down
1 change: 1 addition & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ enum EncodeType {
ENCODE_TYPE_PROTOBUF = 4;
ENCODE_TYPE_JSON = 5;
ENCODE_TYPE_BYTES = 6;
ENCODE_TYPE_TEMPLATE = 7;
}

enum RowFormatType {
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/aggregation/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl AggregateFunction for Filter {
mod tests {
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_expr::aggregate::{build_append_only, AggCall};
use risingwave_expr::expr::{build_from_pretty, Expression, LiteralExpression};
use risingwave_expr::expr::{build_from_pretty, ExpressionBoxExt, LiteralExpression};

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ mod tests {
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::test_prelude::*;
use risingwave_common::types::DataType;
use risingwave_expr::expr::{Expression, InputRefExpression, LiteralExpression};
use risingwave_expr::expr::{ExpressionBoxExt, InputRefExpression, LiteralExpression};
use risingwave_expr::table_function::repeat;

use super::*;
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub enum SinkEncode {
Json,
Protobuf,
Avro,
Template,
}

impl SinkFormatDesc {
Expand Down Expand Up @@ -177,6 +178,7 @@ impl SinkFormatDesc {
SinkEncode::Json => E::Json,
SinkEncode::Protobuf => E::Protobuf,
SinkEncode::Avro => E::Avro,
SinkEncode::Template => E::Template,
};
let options = self
.options
Expand Down Expand Up @@ -212,6 +214,7 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
let encode = match value.encode() {
E::Json => SinkEncode::Json,
E::Protobuf => SinkEncode::Protobuf,
E::Template => SinkEncode::Template,
E::Avro => SinkEncode::Avro,
e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => {
return Err(SinkError::Config(anyhow!(
Expand Down
22 changes: 22 additions & 0 deletions src/connector/src/sink/encoder/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use regex::Regex;
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::types::ToText;

use super::{Result, RowEncoder};
use crate::sink::SinkError;

/// Encode a row according to a specified string template `user_id:{user_id}`
pub struct TemplateEncoder {
Expand All @@ -34,6 +38,24 @@ impl TemplateEncoder {
template,
}
}

pub fn check_string_format(format: &str, set: &HashSet<String>) -> Result<()> {
// We will check if the string inside {} corresponds to a column name in rw.
// In other words, the content within {} should exclusively consist of column names from rw,
// which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect.
let re = Regex::new(r"\{([^}]*)\}").unwrap();
if !re.is_match(format) {
return Err(SinkError::Redis(
"Can't find {} in key_format or value_format".to_string(),
));
}
for capture in re.captures_iter(format) {
if let Some(inner_content) = capture.get(1) && !set.contains(inner_content.as_str()){
return Err(SinkError::Redis(format!("Can't find field({:?}) in key_format or value_format",inner_content.as_str())))
}
}
Ok(())
}
}

impl RowEncoder for TemplateEncoder {
Expand Down
156 changes: 80 additions & 76 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub use upsert::UpsertFormatter;
use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use super::encoder::template::TemplateEncoder;
use super::encoder::KafkaConnectParams;
use super::redis::{KEY_FORMAT, VALUE_FORMAT};
use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode};

/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
Expand Down Expand Up @@ -92,7 +93,7 @@ impl SinkFormatterImpl {
let key_encoder = (!pk_indices.is_empty()).then(|| {
JsonEncoder::new(
schema.clone(),
Some(pk_indices),
Some(pk_indices.clone()),
TimestampHandlingMode::Milli,
)
});
Expand All @@ -115,6 +116,28 @@ impl SinkFormatterImpl {
Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
}
SinkEncode::Avro => err_unsupported(),
SinkEncode::Template => {
let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
key_format.clone(),
);
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
Ok(SinkFormatterImpl::AppendOnlyTemplate(
AppendOnlyFormatter::new(Some(key_encoder), val_encoder),
))
}
}
}
SinkFormat::Debezium => {
Expand All @@ -131,85 +154,66 @@ impl SinkFormatterImpl {
)))
}
SinkFormat::Upsert => {
if format_desc.encode != SinkEncode::Json {
return err_unsupported();
}
match format_desc.encode {
SinkEncode::Json => {
let mut key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let mut val_encoder =
JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

let mut key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let mut val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

if let Some(s) = format_desc.options.get("schemas.enable") {
match s.to_lowercase().parse::<bool>() {
Ok(true) => {
let kafka_connect = KafkaConnectParams {
schema_name: format!("{}.{}", db_name, sink_from_name),
};
key_encoder = key_encoder.with_kafka_connect(kafka_connect.clone());
val_encoder = val_encoder.with_kafka_connect(kafka_connect);
}
Ok(false) => {}
_ => {
return Err(SinkError::Config(anyhow!(
"schemas.enable is expected to be `true` or `false`, got {}",
s
)));
}
if let Some(s) = format_desc.options.get("schemas.enable") {
match s.to_lowercase().parse::<bool>() {
Ok(true) => {
let kafka_connect = KafkaConnectParams {
schema_name: format!("{}.{}", db_name, sink_from_name),
};
key_encoder =
key_encoder.with_kafka_connect(kafka_connect.clone());
val_encoder = val_encoder.with_kafka_connect(kafka_connect);
}
Ok(false) => {}
_ => {
return Err(SinkError::Config(anyhow!(
"schemas.enable is expected to be `true` or `false`, got {}",
s
)));
}
}
};

// Initialize the upsert_stream
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertJson(formatter))
}
};

// Initialize the upsert_stream
let formatter = UpsertFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::UpsertJson(formatter))
}
}
}

pub fn new_with_redis(
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
key_format: Option<String>,
value_format: Option<String>,
) -> Result<Self> {
match (key_format, value_format) {
(Some(k), Some(v)) => {
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
k,
);
let val_encoder =
TemplateEncoder::new(schema, None, v);
if is_append_only {
Ok(SinkFormatterImpl::AppendOnlyTemplate(AppendOnlyFormatter::new(Some(key_encoder), val_encoder)))
} else {
Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new(key_encoder, val_encoder)))
}
}
(None, None) => {
let key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
TimestampHandlingMode::Milli,
);
let val_encoder = JsonEncoder::new(
schema,
None,
TimestampHandlingMode::Milli,
);
if is_append_only {
Ok(SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(Some(key_encoder), val_encoder)))
} else {
Ok(SinkFormatterImpl::UpsertJson(UpsertFormatter::new(key_encoder, val_encoder)))
SinkEncode::Template => {
let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'key_format',please set it or use JSON"
))
})?;
let value_format =
format_desc.options.get(VALUE_FORMAT).ok_or_else(|| {
SinkError::Config(anyhow!(
"Cannot find 'redis_value_format',please set it or use JSON"
))
})?;
let key_encoder = TemplateEncoder::new(
schema.clone(),
Some(pk_indices),
key_format.clone(),
);
let val_encoder = TemplateEncoder::new(schema, None, value_format.clone());
Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new(
key_encoder,
val_encoder,
)))
}
_ => err_unsupported(),
}
}
_ => {
Err(SinkError::Encode("Please provide template formats for both key and value, or choose the JSON format.".to_string()))
}
}
}
}
Expand Down
Loading

0 comments on commit 703052c

Please sign in to comment.