Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): upsert avro with schema registry #13007

Merged
merged 7 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,11 @@ cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1

echo "testing avro"
python3 -m pip install requests confluent-kafka
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc
python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --delete > /dev/null 2>&1
110 changes: 110 additions & 0 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
statement ok
create table from_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');

statement ok
create table into_kafka (
bool_field bool,
string_field varchar,
bytes_field bytea,
float_field real,
double_field double precision,
int32_field int,
int64_field bigint,
record_field struct<id int, name varchar>,
array_field int[][],
timestamp_micros_field timestamptz,
timestamp_millis_field timestamptz,
date_field date,
time_micros_field time,
time_millis_field time);

statement ok
insert into into_kafka values
(true, 'Rising', 'a0', 3.5, 4.25, 22, 23, null, array[array[null, 3], null, array[7, null, 2]], '2006-01-02 15:04:05-07:00', null, null, '12:34:56.123456', null),
(false, 'Wave', 'ZDF', 1.5, null, 11, 12, row(null::int, 'foo'), null, null, '2006-01-02 15:04:05-07:00', '2021-04-01', null, '23:45:16.654321');

statement ok
flush;

statement ok
create sink sink0 from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');

sleep 2s

query TTTRRIITTTTTTTT
select
bool_field,
string_field,
bytes_field,
float_field,
double_field,
int32_field,
int64_field,
record_field,
array_field,
timestamp_micros_field,
timestamp_millis_field,
date_field,
time_micros_field,
time_millis_field from from_kafka;
----
t Rising \x6130 3.5 4.25 22 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL
f Wave \x5a4446 1.5 NULL 11 12 (NULL,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654

statement error SchemaFetchError
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro-err',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');

statement error encode extra_column error: field not in avro
create sink sink_err as select 1 as extra_column, * from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081');

statement error unrecognized
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry.name.strategy = 'typo');

statement error empty field key.message
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field,string_field')
format upsert encode avro (
schema.registry = 'http://message_queue:8081',
schema.registry.name.strategy = 'record_name_strategy');

statement ok
drop sink sink0;

statement ok
drop table into_kafka;

statement ok
drop table from_kafka;
9 changes: 9 additions & 0 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,12 @@ format plain encode protobuf (
force_append_only = true,
schema.location = 's3:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
drop sink sink0;

statement ok
drop table into_kafka;

statement ok
drop table from_kafka;
48 changes: 48 additions & 0 deletions e2e_test/sink/kafka/register_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import sys
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema


def main():
url = sys.argv[1]
subject = sys.argv[2]
with open(sys.argv[3]) as f:
schema_str = f.read()
if 4 < len(sys.argv):
keys = sys.argv[4].split(',')
else:
keys = []

client = SchemaRegistryClient({"url": url})

if keys:
schema_str = select_keys(schema_str, keys)
else:
schema_str = remove_unsupported(schema_str)
schema = Schema(schema_str, 'AVRO')
client.register_schema(subject, schema)


def select_fields(schema_str, f):
import json
root = json.loads(schema_str)
if not isinstance(root, dict):
return schema_str
if root['type'] != 'record':
return schema_str
root['fields'] = f(root['fields'])
return json.dumps(root)


def remove_unsupported(schema_str):
return select_fields(schema_str, lambda fields: [f for f in fields if f['name'] not in {'unsupported', 'mon_day_sec_field'}])


def select_keys(schema_str, keys):
def process(fields):
by_name = {f['name']: f for f in fields}
return [by_name[k] for k in keys]
return select_fields(schema_str, process)


if __name__ == '__main__':
main()
105 changes: 105 additions & 0 deletions src/connector/src/schema/avro.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use apache_avro::Schema as AvroSchema;
use risingwave_pb::catalog::PbSchemaRegistryNameStrategy;

use super::schema_registry::{
get_subject_by_strategy, handle_sr_list, name_strategy_from_str, Client, ConfluentSchema,
SchemaRegistryAuth,
};
use super::{
SchemaFetchError, KEY_MESSAGE_NAME_KEY, MESSAGE_NAME_KEY, NAME_STRATEGY_KEY,
SCHEMA_REGISTRY_KEY,
};

pub struct SchemaWithId {
pub schema: Arc<AvroSchema>,
pub id: i32,
}

impl TryFrom<ConfluentSchema> for SchemaWithId {
type Error = SchemaFetchError;

fn try_from(fetched: ConfluentSchema) -> Result<Self, Self::Error> {
let parsed =
AvroSchema::parse_str(&fetched.content).map_err(|e| SchemaFetchError(e.to_string()))?;
Ok(Self {
schema: Arc::new(parsed),
id: fetched.id,
})
}
}

/// Schema registry only
pub async fn fetch_schema(
format_options: &BTreeMap<String, String>,
topic: &str,
) -> Result<(SchemaWithId, SchemaWithId), SchemaFetchError> {
Comment on lines +49 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy from the source part?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will be merged eventually.

let schema_location = format_options
.get(SCHEMA_REGISTRY_KEY)
.ok_or_else(|| SchemaFetchError(format!("{SCHEMA_REGISTRY_KEY} required")))?
.clone();
let client_config = format_options.into();
let name_strategy = format_options
.get(NAME_STRATEGY_KEY)
.map(|s| {
name_strategy_from_str(s)
.ok_or_else(|| SchemaFetchError(format!("unrecognized strategy {s}")))
})
.transpose()?
.unwrap_or_default();
let key_record_name = format_options
.get(KEY_MESSAGE_NAME_KEY)
.map(std::ops::Deref::deref);
let val_record_name = format_options
.get(MESSAGE_NAME_KEY)
.map(std::ops::Deref::deref);

let (key_schema, val_schema) = fetch_schema_inner(
&schema_location,
&client_config,
&name_strategy,
topic,
key_record_name,
val_record_name,
)
.await
.map_err(|e| SchemaFetchError(e.to_string()))?;

Ok((key_schema.try_into()?, val_schema.try_into()?))
}

async fn fetch_schema_inner(
schema_location: &str,
client_config: &SchemaRegistryAuth,
name_strategy: &PbSchemaRegistryNameStrategy,
topic: &str,
key_record_name: Option<&str>,
val_record_name: Option<&str>,
) -> Result<(ConfluentSchema, ConfluentSchema), risingwave_common::error::RwError> {
let urls = handle_sr_list(schema_location)?;
let client = Client::new(urls, client_config)?;

let key_subject = get_subject_by_strategy(name_strategy, topic, key_record_name, true)?;
let key_schema = client.get_schema_by_subject(&key_subject).await?;

let val_subject = get_subject_by_strategy(name_strategy, topic, val_record_name, false)?;
let val_schema = client.get_schema_by_subject(&val_subject).await?;

Ok((key_schema, val_schema))
}
4 changes: 4 additions & 0 deletions src/connector/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod avro;
pub mod protobuf;
pub mod schema_registry;

const MESSAGE_NAME_KEY: &str = "message";
const KEY_MESSAGE_NAME_KEY: &str = "key.message";
const SCHEMA_LOCATION_KEY: &str = "schema.location";
const SCHEMA_REGISTRY_KEY: &str = "schema.registry";
const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";

#[derive(Debug)]
pub struct SchemaFetchError(pub String);
18 changes: 16 additions & 2 deletions src/connector/src/schema/schema_registry/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;

Expand Down Expand Up @@ -43,6 +43,18 @@ impl From<&HashMap<String, String>> for SchemaRegistryAuth {
}
}

impl From<&BTreeMap<String, String>> for SchemaRegistryAuth {
fn from(props: &BTreeMap<String, String>) -> Self {
const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username";
const SCHEMA_REGISTRY_PASSWORD: &str = "schema.registry.password";

SchemaRegistryAuth {
username: props.get(SCHEMA_REGISTRY_USERNAME).cloned(),
password: props.get(SCHEMA_REGISTRY_PASSWORD).cloned(),
}
}
}

/// An client for communication with schema registry
#[derive(Debug)]
pub struct Client {
Expand Down Expand Up @@ -123,7 +135,9 @@ impl Client {

Err(RwError::from(ProtocolError(format!(
"all request confluent registry all timeout, req path {:?}, urls {:?}, err: {:?}",
path, self.url, errs
path,
self.url,
errs.iter().map(|e| e.to_string()).collect_vec()
))))
}

Expand Down
Loading
Loading