From 848bddab955b483d452f40c349c60f782f64f3a0 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Mon, 23 Oct 2023 12:09:06 +0800 Subject: [PATCH] feat(Sink): support redis sink (#11999) --- Cargo.lock | 15 + integration_tests/redis-sink/README.md | 30 ++ integration_tests/redis-sink/create_mv.sql | 7 + integration_tests/redis-sink/create_sink.sql | 21 + .../redis-sink/create_source.sql | 16 + .../redis-sink/docker-compose.yml | 71 ++++ src/connector/Cargo.toml | 2 + src/connector/src/common.rs | 62 --- src/connector/src/sink/clickhouse.rs | 38 +- src/connector/src/sink/doris.rs | 33 +- src/connector/src/sink/encoder/mod.rs | 1 + src/connector/src/sink/encoder/template.rs | 69 ++++ src/connector/src/sink/formatter/mod.rs | 50 +++ src/connector/src/sink/mod.rs | 9 + src/connector/src/sink/redis.rs | 389 +++++++++++++++++- src/workspace-hack/Cargo.toml | 4 +- 16 files changed, 731 insertions(+), 86 deletions(-) create mode 100644 integration_tests/redis-sink/README.md create mode 100644 integration_tests/redis-sink/create_mv.sql create mode 100644 integration_tests/redis-sink/create_sink.sql create mode 100644 integration_tests/redis-sink/create_source.sql create mode 100644 integration_tests/redis-sink/docker-compose.yml create mode 100644 src/connector/src/sink/encoder/template.rs diff --git a/Cargo.lock b/Cargo.lock index 47e8d5f09c30d..b2875296b683a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1816,7 +1816,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" dependencies = [ "bytes", + "futures-core", "memchr", + "pin-project-lite", + "tokio", + "tokio-util", ] [[package]] @@ -6631,12 +6635,19 @@ version = "0.23.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" dependencies = [ + "async-std", + "async-trait", + "bytes", "combine", + "futures-util", "itoa", "percent-encoding", + "pin-project-lite", "ryu", "sha1_smol", "socket2 0.4.9", + "tokio", + "tokio-util", "url", ] @@ -7294,6 +7305,8 @@ dependencies = [ "protobuf-native", "pulsar", "rand", + "redis", + "regex", "reqwest", "risingwave_common", "risingwave_jni_core", @@ -10931,6 +10944,7 @@ dependencies = [ "ahash 0.8.3", "allocator-api2", "anyhow", + "async-std", "auto_enums", "aws-credential-types", "aws-sdk-s3", @@ -11006,6 +11020,7 @@ dependencies = [ "rand", "rand_chacha", "rand_core", + "redis", "regex", "regex-automata 0.4.1", "regex-syntax 0.8.0", diff --git a/integration_tests/redis-sink/README.md b/integration_tests/redis-sink/README.md new file mode 100644 index 0000000000000..f2e5e64aec795 --- /dev/null +++ b/integration_tests/redis-sink/README.md @@ -0,0 +1,30 @@ +# Demo: Sinking to Redis + +In this demo, we want to showcase how RisingWave is able to sink data to Redis. + +1. Launch the cluster: + +```sh +docker compose up -d +``` + +The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a Redis for sink. + + +2. Execute the SQL queries in sequence: + +- create_source.sql +- create_mv.sql +- create_sink.sql + +3. Execute a simple query: + +```sh +docker compose exec redis redis-ctl keys * + +``` +We also can use 'get' to query value + +```sql +select user_id, count(*) from default.demo_test group by user_id +``` diff --git a/integration_tests/redis-sink/create_mv.sql b/integration_tests/redis-sink/create_mv.sql new file mode 100644 index 0000000000000..0a803f8a2762d --- /dev/null +++ b/integration_tests/redis-sink/create_mv.sql @@ -0,0 +1,7 @@ +CREATE MATERIALIZED VIEW bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp +FROM + user_behaviors; \ No newline at end of file diff --git a/integration_tests/redis-sink/create_sink.sql b/integration_tests/redis-sink/create_sink.sql new file mode 100644 index 0000000000000..03bfc2d0b0df1 --- /dev/null +++ b/integration_tests/redis-sink/create_sink.sql @@ -0,0 +1,21 @@ +CREATE SINK bhv_redis_sink_1 +FROM + bhv_mv WITH ( + primary_key = 'user_id', + connector = 'redis', + type = 'append-only', + force_append_only='true', + redis.url= 'redis://127.0.0.1:6379/', +); + +CREATE SINK bhv_redis_sink_2 +FROM + bhv_mv WITH ( + primary_key = 'user_id', + connector = 'redis', + type = 'append-only', + force_append_only='true', + redis.url= 'redis://127.0.0.1:6379/', + redis.keyformat='user_id:{user_id}', + redis.valueformat='username:{username},event_timestamp{event_timestamp}' +); \ No newline at end of file diff --git a/integration_tests/redis-sink/create_source.sql b/integration_tests/redis-sink/create_source.sql new file mode 100644 index 0000000000000..f64e2ccbec82a --- /dev/null +++ b/integration_tests/redis-sink/create_source.sql @@ -0,0 +1,16 @@ +CREATE table user_behaviors ( + user_id INT, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMP, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +) WITH ( + connector = 'datagen', + fields.user_id.kind = 'sequence', + fields.user_id.start = 1, + fields.user_id.end = 100, + datagen.rows.per.second = '100' +) FORMAT PLAIN ENCODE JSON; \ No newline at end of file diff --git a/integration_tests/redis-sink/docker-compose.yml b/integration_tests/redis-sink/docker-compose.yml new file mode 100644 index 0000000000000..a850f9b35c431 --- /dev/null +++ b/integration_tests/redis-sink/docker-compose.yml @@ -0,0 +1,71 @@ +--- +version: "3" +services: + redis: + image: 'redis:latest' + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 30s + retries: 50 + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + message_queue: + extends: + file: ../../docker/docker-compose.yml + service: message_queue + datagen: + build: ../datagen + depends_on: [message_queue] + command: + - /bin/sh + - -c + - /datagen --mode clickstream --qps 2 kafka --brokers message_queue:29092 + restart: always + container_name: datagen +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index abb7486de3091..4886b1b52fcc5 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -96,6 +96,8 @@ rdkafka = { workspace = true, features = [ "gssapi", "zstd", ] } +redis = { version = "0.23.3", features = ["aio","tokio-comp","async-std-comp"] } +regex = "1.4" reqwest = { version = "0.11", features = ["json"] } risingwave_common = { workspace = true } risingwave_jni_core = { workspace = true } diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 6d848a5036ff1..2af396f5c33b4 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -21,7 +21,6 @@ use anyhow::{anyhow, Ok}; use async_nats::jetstream::consumer::DeliverPolicy; use async_nats::jetstream::{self}; use aws_sdk_kinesis::Client as KinesisClient; -use clickhouse::Client; use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; use pulsar::{Authentication, Pulsar, TokioExecutor}; use rdkafka::ClientConfig; @@ -37,7 +36,6 @@ use url::Url; use crate::aws_auth::AwsAuthProps; use crate::aws_utils::load_file_descriptor_from_s3; use crate::deserialize_duration_from_string; -use crate::sink::doris_connector::DorisGet; use crate::sink::SinkError; use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and @@ -406,66 +404,6 @@ impl KinesisCommon { } } -#[derive(Deserialize, Serialize, Debug, Clone)] -pub struct ClickHouseCommon { - #[serde(rename = "clickhouse.url")] - pub url: String, - #[serde(rename = "clickhouse.user")] - pub user: String, - #[serde(rename = "clickhouse.password")] - pub password: String, - #[serde(rename = "clickhouse.database")] - pub database: String, - #[serde(rename = "clickhouse.table")] - pub table: String, -} - -const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(5); - -impl ClickHouseCommon { - pub(crate) fn build_client(&self) -> anyhow::Result { - use hyper_tls::HttpsConnector; - - let https = HttpsConnector::new(); - let client = hyper::Client::builder() - .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build::<_, hyper::Body>(https); - - let client = Client::with_http_client(client) - .with_url(&self.url) - .with_user(&self.user) - .with_password(&self.password) - .with_database(&self.database); - Ok(client) - } -} - -#[derive(Deserialize, Serialize, Debug, Clone)] -pub struct DorisCommon { - #[serde(rename = "doris.url")] - pub url: String, - #[serde(rename = "doris.user")] - pub user: String, - #[serde(rename = "doris.password")] - pub password: String, - #[serde(rename = "doris.database")] - pub database: String, - #[serde(rename = "doris.table")] - pub table: String, -} - -impl DorisCommon { - pub(crate) fn build_get_client(&self) -> DorisGet { - DorisGet::new( - self.url.clone(), - self.table.clone(), - self.database.clone(), - self.user.clone(), - self.password.clone(), - ) - } -} - #[derive(Debug, Serialize, Deserialize)] pub struct UpsertMessage<'a> { #[serde(borrow)] diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index b9733863feccb..2bddf8026216f 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -14,9 +14,10 @@ use core::fmt::Debug; use std::collections::{HashMap, HashSet}; +use std::time::Duration; use anyhow::anyhow; -use clickhouse::{Client, Row as ClickHouseRow}; +use clickhouse::{Client, Client as ClickHouseClient, Row as ClickHouseRow}; use itertools::Itertools; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; @@ -28,7 +29,6 @@ use serde_derive::Deserialize; use serde_with::serde_as; use super::{DummySinkCommitCoordinator, SinkWriterParam}; -use crate::common::ClickHouseCommon; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -37,6 +37,40 @@ use crate::sink::{ pub const CLICKHOUSE_SINK: &str = "clickhouse"; const BUFFER_SIZE: usize = 1024; +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct ClickHouseCommon { + #[serde(rename = "clickhouse.url")] + pub url: String, + #[serde(rename = "clickhouse.user")] + pub user: String, + #[serde(rename = "clickhouse.password")] + pub password: String, + #[serde(rename = "clickhouse.database")] + pub database: String, + #[serde(rename = "clickhouse.table")] + pub table: String, +} + +const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(5); + +impl ClickHouseCommon { + pub(crate) fn build_client(&self) -> anyhow::Result { + use hyper_tls::HttpsConnector; + + let https = HttpsConnector::new(); + let client = hyper::Client::builder() + .pool_idle_timeout(POOL_IDLE_TIMEOUT) + .build::<_, hyper::Body>(https); + + let client = ClickHouseClient::with_http_client(client) + .with_url(&self.url) + .with_user(&self.user) + .with_password(&self.password) + .with_database(&self.database); + Ok(client) + } +} + #[serde_as] #[derive(Clone, Debug, Deserialize)] pub struct ClickHouseConfig { diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 0a32e4406108b..113c7275c6d4e 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -22,12 +22,14 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use serde::Deserialize; +use serde_derive::Serialize; use serde_json::Value; use serde_with::serde_as; -use super::doris_connector::{DorisField, DorisInsert, DorisInsertClient, DORIS_DELETE_SIGN}; +use super::doris_connector::{ + DorisField, DorisGet, DorisInsert, DorisInsertClient, DORIS_DELETE_SIGN, +}; use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; -use crate::common::DorisCommon; use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ @@ -35,6 +37,33 @@ use crate::sink::{ }; pub const DORIS_SINK: &str = "doris"; + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct DorisCommon { + #[serde(rename = "doris.url")] + pub url: String, + #[serde(rename = "doris.user")] + pub user: String, + #[serde(rename = "doris.password")] + pub password: String, + #[serde(rename = "doris.database")] + pub database: String, + #[serde(rename = "doris.table")] + pub table: String, +} + +impl DorisCommon { + pub(crate) fn build_get_client(&self) -> DorisGet { + DorisGet::new( + self.url.clone(), + self.table.clone(), + self.database.clone(), + self.user.clone(), + self.password.clone(), + ) + } +} + #[serde_as] #[derive(Clone, Debug, Deserialize)] pub struct DorisConfig { diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 6a2558f5fc35a..83b2ab4f09df0 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -23,6 +23,7 @@ use crate::sink::Result; mod avro; mod json; mod proto; +pub mod template; pub use avro::AvroEncoder; pub use json::JsonEncoder; diff --git a/src/connector/src/sink/encoder/template.rs b/src/connector/src/sink/encoder/template.rs new file mode 100644 index 0000000000000..85f085989b6c4 --- /dev/null +++ b/src/connector/src/sink/encoder/template.rs @@ -0,0 +1,69 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::Schema; +use risingwave_common::row::Row; +use risingwave_common::types::ToText; + +use super::{Result, RowEncoder}; + +/// Encode a row according to a specified string template `user_id:{user_id}` +pub struct TemplateEncoder { + schema: Schema, + col_indices: Option>, + template: String, +} + +/// todo! improve the performance. +impl TemplateEncoder { + pub fn new(schema: Schema, col_indices: Option>, template: String) -> Self { + Self { + schema, + col_indices, + template, + } + } +} + +impl RowEncoder for TemplateEncoder { + type Output = String; + + fn schema(&self) -> &Schema { + &self.schema + } + + fn col_indices(&self) -> Option<&[usize]> { + self.col_indices.as_ref().map(Vec::as_ref) + } + + fn encode_cols( + &self, + row: impl Row, + col_indices: impl Iterator, + ) -> Result { + let mut s = self.template.to_string(); + + for idx in col_indices { + let field = &self.schema[idx]; + let name = &field.name; + let data = row.datum_at(idx); + // TODO: timestamptz ToText also depends on TimeZone + s = s.replace( + &format!("{{{}}}", name), + &data.to_text_with_type(&field.data_type), + ); + } + Ok(s) + } +} diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 5a1c7379795d5..a7463f7e3b306 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::Schema; pub use upsert::UpsertFormatter; use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; +use super::encoder::template::TemplateEncoder; use super::encoder::KafkaConnectParams; use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode}; @@ -66,6 +67,8 @@ pub enum SinkFormatterImpl { AppendOnlyProto(AppendOnlyFormatter), UpsertJson(UpsertFormatter), DebeziumJson(DebeziumJsonFormatter), + AppendOnlyTemplate(AppendOnlyFormatter), + UpsertTemplate(UpsertFormatter), } impl SinkFormatterImpl { @@ -164,6 +167,51 @@ impl SinkFormatterImpl { } } } + + pub fn new_with_redis( + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + key_format: Option, + value_format: Option, + ) -> Result { + match (key_format, value_format) { + (Some(k), Some(v)) => { + let key_encoder = TemplateEncoder::new( + schema.clone(), + Some(pk_indices), + k, + ); + let val_encoder = + TemplateEncoder::new(schema, None, v); + if is_append_only { + Ok(SinkFormatterImpl::AppendOnlyTemplate(AppendOnlyFormatter::new(Some(key_encoder), val_encoder))) + } else { + Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new(key_encoder, val_encoder))) + } + } + (None, None) => { + let key_encoder = JsonEncoder::new( + schema.clone(), + Some(pk_indices), + TimestampHandlingMode::Milli, + ); + let val_encoder = JsonEncoder::new( + schema, + None, + TimestampHandlingMode::Milli, + ); + if is_append_only { + Ok(SinkFormatterImpl::AppendOnlyJson(AppendOnlyFormatter::new(Some(key_encoder), val_encoder))) + } else { + Ok(SinkFormatterImpl::UpsertJson(UpsertFormatter::new(key_encoder, val_encoder))) + } + } + _ => { + Err(SinkError::Encode("Please provide template formats for both key and value, or choose the JSON format.".to_string())) + } + } + } } #[macro_export] @@ -174,6 +222,8 @@ macro_rules! dispatch_sink_formatter_impl { SinkFormatterImpl::AppendOnlyProto($name) => $body, SinkFormatterImpl::UpsertJson($name) => $body, SinkFormatterImpl::DebeziumJson($name) => $body, + SinkFormatterImpl::AppendOnlyTemplate($name) => $body, + SinkFormatterImpl::UpsertTemplate($name) => $body, } }; } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 053467d9f125d..7769a87f4e715 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -37,6 +37,7 @@ use std::collections::HashMap; use std::future::Future; use ::clickhouse::error::Error as ClickHouseError; +use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; use risingwave_common::buffer::Bitmap; @@ -383,6 +384,8 @@ pub enum SinkError { Coordinator(anyhow::Error), #[error("ClickHouse error: {0}")] ClickHouse(String), + #[error("Redis error: {0}")] + Redis(String), #[error("Nats error: {0}")] Nats(anyhow::Error), #[error("Doris http error: {0}")] @@ -413,6 +416,12 @@ impl From for SinkError { } } +impl From for SinkError { + fn from(value: RedisError) -> Self { + SinkError::Redis(format!("{}", value)) + } +} + impl From for RwError { fn from(e: SinkError) -> Self { ErrorCode::SinkError(Box::new(e)).into() diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index e85e984821b15..cc8ff74d0c9c5 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -12,25 +12,114 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{HashMap, HashSet}; + +use anyhow::anyhow; use async_trait::async_trait; +use redis::aio::Connection; +use redis::{Client as RedisClient, Pipeline}; +use regex::Regex; use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::Schema; +use serde_derive::{Deserialize, Serialize}; +use serde_with::serde_as; + +use super::formatter::SinkFormatterImpl; +use super::writer::FormattedSink; +use super::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::dispatch_sink_formatter_impl; +use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; +use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; -use crate::sink::writer::LogSinkerOf; -use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, -}; +pub const REDIS_SINK: &str = "redis"; -#[derive(Clone, Debug)] -pub struct RedisConfig; +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct RedisCommon { + #[serde(rename = "redis.url")] + pub url: String, + #[serde(rename = "redis.keyformat")] + pub key_format: Option, + #[serde(rename = "redis.valueformat")] + pub value_format: Option, +} + +impl RedisCommon { + pub(crate) fn build_client(&self) -> anyhow::Result { + let client = RedisClient::open(self.url.clone())?; + Ok(client) + } +} +#[serde_as] +#[derive(Clone, Debug, Deserialize)] +pub struct RedisConfig { + #[serde(flatten)] + pub common: RedisCommon, + + pub r#type: String, // accept "append-only" or "upsert" +} + +impl RedisConfig { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = + serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } +} #[derive(Debug)] -pub struct RedisSink; +pub struct RedisSink { + config: RedisConfig, + schema: Schema, + is_append_only: bool, + pk_indices: Vec, +} + +fn check_string_format(format: &Option, set: &HashSet) -> Result<()> { + if let Some(format) = format { + // We will check if the string inside {} corresponds to a column name in rw. + // In other words, the content within {} should exclusively consist of column names from rw, + // which means '{{column_name}}' or '{{column_name1},{column_name2}}' would be incorrect. + let re = Regex::new(r"\{([^}]*)\}").unwrap(); + if !re.is_match(format) { + return Err(SinkError::Redis( + "Can't find {} in key_format or value_format".to_string(), + )); + } + for capture in re.captures_iter(format) { + if let Some(inner_content) = capture.get(1) && !set.contains(inner_content.as_str()){ + return Err(SinkError::Redis(format!("Can't find field({:?}) in key_format or value_format",inner_content.as_str()))) + } + } + } + Ok(()) +} +#[async_trait] impl TryFrom for RedisSink { type Error = SinkError; - fn try_from(_param: SinkParam) -> std::result::Result { - todo!() + fn try_from(param: SinkParam) -> std::result::Result { + if param.downstream_pk.is_empty() { + return Err(SinkError::Config(anyhow!( + "Redis Sink Primary Key must be specified." + ))); + } + let config = RedisConfig::from_hashmap(param.properties.clone())?; + Ok(Self { + config, + schema: param.schema(), + is_append_only: param.sink_type.is_append_only(), + pk_indices: param.downstream_pk, + }) } } @@ -40,28 +129,290 @@ impl Sink for RedisSink { const SINK_NAME: &'static str = "redis"; - async fn new_log_sinker(&self, _writer_env: SinkWriterParam) -> Result { - todo!() + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + Ok(RedisSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + ) + .await? + .into_log_sinker(writer_param.sink_metrics)) } async fn validate(&self) -> Result<()> { - todo!() + let client = self.config.common.build_client()?; + client.get_connection()?; + let all_set: HashSet = self + .schema + .fields() + .iter() + .map(|f| f.name.clone()) + .collect(); + let pk_set: HashSet = self + .schema + .fields() + .iter() + .enumerate() + .filter(|(k, _)| self.pk_indices.contains(k)) + .map(|(_, v)| v.name.clone()) + .collect(); + check_string_format(&self.config.common.key_format, &pk_set)?; + check_string_format(&self.config.common.value_format, &all_set)?; + Ok(()) } } -pub struct RedisSinkWriter; +pub struct RedisSinkWriter { + epoch: u64, + schema: Schema, + is_append_only: bool, + pk_indices: Vec, + formatter: SinkFormatterImpl, + payload_writer: RedisSinkPayloadWriter, +} + +struct RedisSinkPayloadWriter { + // connection to redis, one per executor + conn: Option, + // the command pipeline for write-commit + pipe: Pipeline, +} +impl RedisSinkPayloadWriter { + pub async fn new(config: RedisConfig) -> Result { + let client = config.common.build_client()?; + let conn = Some(client.get_async_connection().await?); + let pipe = redis::pipe(); + + Ok(Self { conn, pipe }) + } + + #[cfg(test)] + pub fn mock() -> Self { + let conn = None; + let pipe = redis::pipe(); + Self { conn, pipe } + } + + pub async fn commit(&mut self) -> Result<()> { + self.pipe.query_async(self.conn.as_mut().unwrap()).await?; + self.pipe.clear(); + Ok(()) + } +} + +impl FormattedSink for RedisSinkPayloadWriter { + type K = String; + type V = Vec; + + async fn write_one(&mut self, k: Option, v: Option) -> Result<()> { + let k = k.unwrap(); + match v { + Some(v) => self.pipe.set(k, v), + None => self.pipe.del(k), + }; + Ok(()) + } +} + +impl RedisSinkWriter { + pub async fn new( + config: RedisConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + let payload_writer = RedisSinkPayloadWriter::new(config.clone()).await?; + let formatter = SinkFormatterImpl::new_with_redis( + schema.clone(), + pk_indices.clone(), + is_append_only, + config.common.key_format, + config.common.value_format, + )?; + + Ok(Self { + schema, + pk_indices, + is_append_only, + epoch: 0, + formatter, + payload_writer, + }) + } + + #[cfg(test)] + pub fn mock( + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + key_format: Option, + value_format: Option, + ) -> Result { + let formatter = SinkFormatterImpl::new_with_redis( + schema.clone(), + pk_indices.clone(), + is_append_only, + key_format, + value_format, + )?; + Ok(Self { + schema, + pk_indices, + is_append_only, + epoch: 0, + formatter, + payload_writer: RedisSinkPayloadWriter::mock(), + }) + } +} #[async_trait] impl SinkWriter for RedisSinkWriter { - async fn write_batch(&mut self, _chunk: StreamChunk) -> Result<()> { - todo!(); + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + dispatch_sink_formatter_impl!(&self.formatter, formatter, { + self.payload_writer.write_chunk(chunk, formatter).await + }) } - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - todo!() + async fn begin_epoch(&mut self, epoch: u64) -> Result<()> { + self.epoch = epoch; + Ok(()) } - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - todo!() + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { + if is_checkpoint { + self.payload_writer.commit().await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + use rdkafka::message::FromBytes; + use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array}; + use risingwave_common::catalog::{Field, Schema}; + use risingwave_common::types::DataType; + use risingwave_common::util::iter_util::ZipEqDebug; + + use super::*; + + #[tokio::test] + async fn test_write() { + let schema = Schema::new(vec![ + Field { + data_type: DataType::Int32, + name: "id".to_string(), + sub_fields: vec![], + type_name: "string".to_string(), + }, + Field { + data_type: DataType::Varchar, + name: "name".to_string(), + sub_fields: vec![], + type_name: "string".to_string(), + }, + ]); + + let mut redis_sink_writer = + RedisSinkWriter::mock(schema, vec![0], true, None, None).unwrap(); + + let chunk_a = StreamChunk::new( + vec![Op::Insert, Op::Insert, Op::Insert], + vec![ + I32Array::from_iter(vec![1, 2, 3]).into_ref(), + Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(), + ], + ); + + redis_sink_writer + .write_batch(chunk_a) + .await + .expect("failed to write batch"); + let expected_a = + vec![ + (0, "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":1}\r\n$23\r\n{\"id\":1,\"name\":\"Alice\"}\r\n"), + (1, "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":2}\r\n$21\r\n{\"id\":2,\"name\":\"Bob\"}\r\n"), + (2, "*3\r\n$3\r\nSET\r\n$8\r\n{\"id\":3}\r\n$23\r\n{\"id\":3,\"name\":\"Clare\"}\r\n"), + ]; + + redis_sink_writer + .payload_writer + .pipe + .cmd_iter() + .enumerate() + .zip_eq_debug(expected_a.clone()) + .for_each(|((i, cmd), (exp_i, exp_cmd))| { + if exp_i == i { + assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap()) + } + }); + } + + #[tokio::test] + async fn test_format_write() { + let schema = Schema::new(vec![ + Field { + data_type: DataType::Int32, + name: "id".to_string(), + sub_fields: vec![], + type_name: "string".to_string(), + }, + Field { + data_type: DataType::Varchar, + name: "name".to_string(), + sub_fields: vec![], + type_name: "string".to_string(), + }, + ]); + + let mut redis_sink_writer = RedisSinkWriter::mock( + schema, + vec![0], + true, + Some("key-{id}".to_string()), + Some("values:{id:{id},name:{name}}".to_string()), + ) + .unwrap(); + + let chunk_a = StreamChunk::new( + vec![Op::Insert, Op::Insert, Op::Insert], + vec![ + I32Array::from_iter(vec![1, 2, 3]).into_ref(), + Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(), + ], + ); + + redis_sink_writer + .write_batch(chunk_a) + .await + .expect("failed to write batch"); + let expected_a = vec![ + ( + 0, + "*3\r\n$3\r\nSET\r\n$5\r\nkey-1\r\n$24\r\nvalues:{id:1,name:Alice}\r\n", + ), + ( + 1, + "*3\r\n$3\r\nSET\r\n$5\r\nkey-2\r\n$22\r\nvalues:{id:2,name:Bob}\r\n", + ), + ( + 2, + "*3\r\n$3\r\nSET\r\n$5\r\nkey-3\r\n$24\r\nvalues:{id:3,name:Clare}\r\n", + ), + ]; + + redis_sink_writer + .payload_writer + .pipe + .cmd_iter() + .enumerate() + .zip_eq_debug(expected_a.clone()) + .for_each(|((i, cmd), (exp_i, exp_cmd))| { + if exp_i == i { + assert_eq!(exp_cmd, str::from_bytes(&cmd.get_packed_command()).unwrap()) + } + }); } } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 0826ea1014e3d..2ae671ca2de93 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -21,6 +21,7 @@ publish = false ahash = { version = "0.8" } allocator-api2 = { version = "0.2", default-features = false, features = ["alloc", "nightly"] } anyhow = { version = "1", features = ["backtrace"] } +async-std = { version = "1", features = ["attributes", "tokio1"] } aws-credential-types = { version = "0.55", default-features = false, features = ["hardcoded-credentials"] } aws-sdk-s3 = { version = "0.28", features = ["native-tls"] } aws-smithy-client = { version = "0.55", default-features = false, features = ["native-tls", "rustls"] } @@ -32,7 +33,7 @@ bytes = { version = "1", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } clap = { version = "4", features = ["cargo", "derive", "env"] } clap_builder = { version = "4", default-features = false, features = ["cargo", "color", "env", "help", "std", "suggestions", "usage"] } -combine = { version = "4" } +combine = { version = "4", features = ["tokio"] } crossbeam-epoch = { version = "0.9" } crossbeam-queue = { version = "0.3" } crossbeam-utils = { version = "0.8" } @@ -93,6 +94,7 @@ prost-types = { version = "0.12" } rand = { version = "0.8", features = ["small_rng"] } rand_chacha = { version = "0.3" } rand_core = { version = "0.6", default-features = false, features = ["std"] } +redis = { version = "0.23", features = ["async-std-comp", "tokio-comp"] } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.8" }