Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): remove upsert_avro_primary_key and separate extracting columns and bind pk #13023

Merged
merged 9 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e_test/source/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ create source invalid_startup_timestamp (
properties.bootstrap.server = 'message_queue:29092'
) FORMAT PLAIN ENCODE JSON;

statement error db error: ERROR: QueryError: Invalid input syntax: schema definition is required for ENCODE JSON
statement error db error: ERROR: QueryError: Protocol error: Schema definition is required, either from SQL or schema registry.
create source invalid_schema_definition
with (
connector = 'kafka',
Expand Down
78 changes: 25 additions & 53 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,17 @@ WITH (
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');


statement ok
CREATE TABLE upsert_avro_json (
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_avro_json')
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');
# TODO: Uncomment this when we add test data kafka key with format `"ID":id`
# statement ok
# CREATE TABLE upsert_avro_json (
# PRIMARY KEY("ID")
# )
# WITH (
# connector = 'kafka',
# properties.bootstrap.server = 'message_queue:29092',
# topic = 'upsert_avro_json')
# FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');

# Just ignore the kafka key, it works
statement ok
CREATE TABLE upsert_avro_json2 (
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_avro_json')
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');

statement ok
CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with (
Expand Down Expand Up @@ -89,7 +80,7 @@ statement ok
flush;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 5s
sleep 8s

query II
SELECT
Expand All @@ -104,33 +95,18 @@ delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z

query II
SELECT
*
FROM
upsert_avro_json
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z


query II
SELECT
*
FROM
upsert_avro_json2
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z

# query II
# SELECT
# *
# FROM
# upsert_avro_json
# ORDER BY
# "ID";
# ----
# update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
# delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
# delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
# delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z

query II
SELECT
Expand Down Expand Up @@ -172,12 +148,8 @@ select * from kafka_json_schema_upsert order by id
statement ok
DROP TABLE upsert_avro_json_default_key;

statement ok
DROP TABLE upsert_avro_json;


statement ok
DROP TABLE upsert_avro_json2;
# statement ok
# DROP TABLE upsert_avro_json;

statement ok
DROP TABLE upsert_student_avro_json;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/old_row_format_syntax/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ create source invalid_startup_timestamp (
properties.bootstrap.server = 'message_queue:29092'
) ROW FORMAT JSON;

statement error db error: ERROR: QueryError: Invalid input syntax: schema definition is required for ENCODE JSON
statement error db error: ERROR: QueryError: Protocol error: Schema definition is required, either from SQL or schema registry.
create source invalid_schema_definition
with (
connector = 'kafka',
Expand Down
81 changes: 26 additions & 55 deletions e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,17 @@ ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'


statement ok
CREATE TABLE upsert_avro_json (
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_avro_json')
ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'

# Just ignore the kafka key, it works
statement ok
CREATE TABLE upsert_avro_json2 (
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_avro_json')
ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'
# TODO: Uncomment this when we add test data kafka key with format `"ID":id`
# statement ok
# CREATE TABLE upsert_avro_json (
# PRIMARY KEY("ID")
# )
# WITH (
# connector = 'kafka',
# properties.bootstrap.server = 'message_queue:29092',
# topic = 'upsert_avro_json')
# ROW FORMAT UPSERT_AVRO
# row schema location confluent schema registry 'http://message_queue:8081'

statement ok
CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with (
Expand All @@ -65,7 +54,6 @@ CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with (
) ROW FORMAT DEBEZIUM_AVRO ROW SCHEMA LOCATION CONFLUENT SCHEMA REGISTRY 'http://message_queue:8081';



statement ok
CREATE TABLE debezium_compact (PRIMARY KEY(order_id)) with (
connector = 'kafka',
Expand All @@ -78,7 +66,7 @@ statement ok
flush;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 5s
sleep 10s

query II
SELECT
Expand All @@ -93,32 +81,18 @@ delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z

query II
SELECT
*
FROM
upsert_avro_json
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z


query II
SELECT
*
FROM
upsert_avro_json2
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
# query II
# SELECT
# *
# FROM
# upsert_avro_json
# ORDER BY
# "ID";
# ----
# update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
# delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
# delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
# delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z


query II
Expand Down Expand Up @@ -150,12 +124,9 @@ select count(*) from debezium_compact;
statement ok
DROP TABLE upsert_avro_json_default_key;

statement ok
DROP TABLE upsert_avro_json;

# statement ok
# DROP TABLE upsert_avro_json;

statement ok
DROP TABLE upsert_avro_json2;

statement ok
DROP TABLE upsert_student_avro_json;
Expand Down
3 changes: 2 additions & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ message StreamSourceInfo {
string proto_message_name = 4;
int32 csv_delimiter = 5;
bool csv_has_header = 6;
string upsert_avro_primary_key = 7;
reserved 7;
reserved "upsert_avro_primary_key"; // deprecated
plan_common.FormatType format = 8;
plan_common.EncodeType row_encode = 9;
SchemaRegistryNameStrategy name_strategy = 10;
Expand Down
10 changes: 1 addition & 9 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
pub upsert_primary_key_column_name: Option<String>,
}

impl AvroParserConfig {
Expand All @@ -120,12 +119,7 @@ impl AvroParserConfig {
if avro_config.use_schema_registry {
let client = Client::new(url, &avro_config.client_config)?;
let resolver = ConfluentSchemaResolver::new(client);
let upsert_primary_key_column_name =
if enable_upsert && !avro_config.upsert_primary_key.is_empty() {
Some(avro_config.upsert_primary_key.clone())
} else {
None
};

let subject_key = if enable_upsert {
Some(get_subject_by_strategy(
&avro_config.name_strategy,
Expand Down Expand Up @@ -157,7 +151,6 @@ impl AvroParserConfig {
None
},
schema_resolver: Some(Arc::new(resolver)),
upsert_primary_key_column_name,
})
} else {
if enable_upsert {
Expand All @@ -184,7 +177,6 @@ impl AvroParserConfig {
schema: Arc::new(schema),
key_schema: None,
schema_resolver: None,
upsert_primary_key_column_name: None,
})
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,6 @@ impl SpecificParserConfig {
pub struct AvroProperties {
pub use_schema_registry: bool,
pub row_schema_location: String,
pub upsert_primary_key: String,
pub client_config: SchemaRegistryAuth,
pub aws_auth_props: Option<AwsAuthProps>,
pub topic: String,
Expand Down Expand Up @@ -887,7 +886,6 @@ impl SpecificParserConfig {
.unwrap(),
use_schema_registry: info.use_schema_registry,
row_schema_location: info.row_schema_location.clone(),
upsert_primary_key: info.upsert_avro_primary_key.clone(),
..Default::default()
};
if format == SourceFormat::Upsert {
Expand Down
24 changes: 7 additions & 17 deletions src/connector/src/parser/upsert_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ pub struct UpsertParser {
payload_builder: AccessBuilderImpl,
pub(crate) rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
avro_primary_key_column_name: Option<String>,
}

async fn build_accessor_builder(
Expand Down Expand Up @@ -68,31 +67,25 @@ impl UpsertParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> Result<Self> {
let mut avro_primary_key_column_name = None;
let key_builder: AccessBuilderImpl;
// check whether columns has `DEFAULT_KEY_COLUMN_NAME`, if so, the key accessor should be
// bytes
if check_rw_default_key(&rw_columns) {
key_builder = AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
EncodingProperties::Bytes(BytesProperties {
let key_builder = if check_rw_default_key(&rw_columns) {
AccessBuilderImpl::Bytes(BytesAccessBuilder::new(EncodingProperties::Bytes(
BytesProperties {
column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()),
}),
)?);
},
))?)
} else {
if let EncodingProperties::Avro(config) = &props.encoding_config {
avro_primary_key_column_name = Some(config.upsert_primary_key.clone())
}
let (key_config, key_type) = extract_key_config!(props);
key_builder = build_accessor_builder(key_config, key_type).await?;
}
build_accessor_builder(key_config, key_type).await?
};
let payload_builder =
build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
Ok(Self {
key_builder,
payload_builder,
rw_columns,
source_ctx,
avro_primary_key_column_name,
})
}

Expand All @@ -113,9 +106,6 @@ impl UpsertParser {
row_op = row_op.with_value(self.payload_builder.generate_accessor(data).await?);
change_event_op = ChangeEventOperation::Upsert;
}
if let Some(primary_key_name) = &self.avro_primary_key_column_name {
row_op = row_op.with_key_as_column_name(primary_key_name);
}

apply_row_operation_on_stream_chunk_writer_with_op(row_op, &mut writer, change_event_op)
.map_err(Into::into)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
- id: create_source_without_schema_in_json
sql: |
create source s with(connector='kafka') FORMAT PLAIN ENCODE JSON;
planner_error: 'Invalid input syntax: schema definition is required for ENCODE JSON'
planner_error: 'Protocol error: Schema definition is required, either from SQL or schema registry.'
- id: csv_delimiter_tab
sql: |
explain create table s0 (v1 int, v2 varchar) with (
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ pub async fn handle_create_sink(
}

/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
/// This is an analogy to (part of) [`crate::handler::create_source::try_bind_columns_from_source`]
/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
fn bind_sink_format_desc(value: ConnectorSchema) -> Result<SinkFormatDesc> {
use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
Expand Down
Loading
Loading