Skip to content

Commit

Permalink
refactor(source): remove upsert_avro_primary_key and separate extr…
Browse files Browse the repository at this point in the history
…acting columns and bind pk (#13023)
  • Loading branch information
yuhao-su authored Oct 30, 2023
1 parent e392db0 commit 28b0ef2
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 431 deletions.
2 changes: 1 addition & 1 deletion e2e_test/source/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ create source invalid_startup_timestamp (
properties.bootstrap.server = 'message_queue:29092'
) FORMAT PLAIN ENCODE JSON;

statement error db error: ERROR: QueryError: Invalid input syntax: schema definition is required for ENCODE JSON
statement error db error: ERROR: QueryError: Protocol error: Schema definition is required, either from SQL or schema registry.
create source invalid_schema_definition
with (
connector = 'kafka',
Expand Down
78 changes: 25 additions & 53 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,17 @@ WITH (
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');


statement ok
CREATE TABLE upsert_avro_json (
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_avro_json')
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');
# TODO: Uncomment this when we add test data kafka key with format `"ID":id`
# statement ok
# CREATE TABLE upsert_avro_json (
# PRIMARY KEY("ID")
# )
# WITH (
# connector = 'kafka',
# properties.bootstrap.server = 'message_queue:29092',
# topic = 'upsert_avro_json')
# FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');

# Just ignore the kafka key, it works
statement ok
CREATE TABLE upsert_avro_json2 (
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_avro_json')
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');

statement ok
CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with (
Expand Down Expand Up @@ -89,7 +80,7 @@ statement ok
flush;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 5s
sleep 8s

query II
SELECT
Expand All @@ -104,33 +95,18 @@ delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z

query II
SELECT
*
FROM
upsert_avro_json
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z


query II
SELECT
*
FROM
upsert_avro_json2
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z

# query II
# SELECT
# *
# FROM
# upsert_avro_json
# ORDER BY
# "ID";
# ----
# update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
# delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
# delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
# delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z

query II
SELECT
Expand Down Expand Up @@ -172,12 +148,8 @@ select * from kafka_json_schema_upsert order by id
statement ok
DROP TABLE upsert_avro_json_default_key;

statement ok
DROP TABLE upsert_avro_json;


statement ok
DROP TABLE upsert_avro_json2;
# statement ok
# DROP TABLE upsert_avro_json;

statement ok
DROP TABLE upsert_student_avro_json;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/old_row_format_syntax/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ create source invalid_startup_timestamp (
properties.bootstrap.server = 'message_queue:29092'
) ROW FORMAT JSON;

statement error db error: ERROR: QueryError: Invalid input syntax: schema definition is required for ENCODE JSON
statement error db error: ERROR: QueryError: Protocol error: Schema definition is required, either from SQL or schema registry.
create source invalid_schema_definition
with (
connector = 'kafka',
Expand Down
81 changes: 26 additions & 55 deletions e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,17 @@ ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'


statement ok
CREATE TABLE upsert_avro_json (
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_avro_json')
ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'

# Just ignore the kafka key, it works
statement ok
CREATE TABLE upsert_avro_json2 (
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_avro_json')
ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'
# TODO: Uncomment this when we add test data kafka key with format `"ID":id`
# statement ok
# CREATE TABLE upsert_avro_json (
# PRIMARY KEY("ID")
# )
# WITH (
# connector = 'kafka',
# properties.bootstrap.server = 'message_queue:29092',
# topic = 'upsert_avro_json')
# ROW FORMAT UPSERT_AVRO
# row schema location confluent schema registry 'http://message_queue:8081'

statement ok
CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with (
Expand All @@ -65,7 +54,6 @@ CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with (
) ROW FORMAT DEBEZIUM_AVRO ROW SCHEMA LOCATION CONFLUENT SCHEMA REGISTRY 'http://message_queue:8081';



statement ok
CREATE TABLE debezium_compact (PRIMARY KEY(order_id)) with (
connector = 'kafka',
Expand All @@ -78,7 +66,7 @@ statement ok
flush;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 5s
sleep 10s

query II
SELECT
Expand All @@ -93,32 +81,18 @@ delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z

query II
SELECT
*
FROM
upsert_avro_json
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z


query II
SELECT
*
FROM
upsert_avro_json2
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
# query II
# SELECT
# *
# FROM
# upsert_avro_json
# ORDER BY
# "ID";
# ----
# update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
# delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
# delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
# delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z


query II
Expand Down Expand Up @@ -150,12 +124,9 @@ select count(*) from debezium_compact;
statement ok
DROP TABLE upsert_avro_json_default_key;

statement ok
DROP TABLE upsert_avro_json;

# statement ok
# DROP TABLE upsert_avro_json;

statement ok
DROP TABLE upsert_avro_json2;

statement ok
DROP TABLE upsert_student_avro_json;
Expand Down
3 changes: 2 additions & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ message StreamSourceInfo {
string proto_message_name = 4;
int32 csv_delimiter = 5;
bool csv_has_header = 6;
string upsert_avro_primary_key = 7;
reserved 7;
reserved "upsert_avro_primary_key"; // deprecated
plan_common.FormatType format = 8;
plan_common.EncodeType row_encode = 9;
SchemaRegistryNameStrategy name_strategy = 10;
Expand Down
10 changes: 1 addition & 9 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
pub upsert_primary_key_column_name: Option<String>,
}

impl AvroParserConfig {
Expand All @@ -120,12 +119,7 @@ impl AvroParserConfig {
if avro_config.use_schema_registry {
let client = Client::new(url, &avro_config.client_config)?;
let resolver = ConfluentSchemaResolver::new(client);
let upsert_primary_key_column_name =
if enable_upsert && !avro_config.upsert_primary_key.is_empty() {
Some(avro_config.upsert_primary_key.clone())
} else {
None
};

let subject_key = if enable_upsert {
Some(get_subject_by_strategy(
&avro_config.name_strategy,
Expand Down Expand Up @@ -157,7 +151,6 @@ impl AvroParserConfig {
None
},
schema_resolver: Some(Arc::new(resolver)),
upsert_primary_key_column_name,
})
} else {
if enable_upsert {
Expand All @@ -184,7 +177,6 @@ impl AvroParserConfig {
schema: Arc::new(schema),
key_schema: None,
schema_resolver: None,
upsert_primary_key_column_name: None,
})
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,6 @@ impl SpecificParserConfig {
pub struct AvroProperties {
pub use_schema_registry: bool,
pub row_schema_location: String,
pub upsert_primary_key: String,
pub client_config: SchemaRegistryAuth,
pub aws_auth_props: Option<AwsAuthProps>,
pub topic: String,
Expand Down Expand Up @@ -895,7 +894,6 @@ impl SpecificParserConfig {
.unwrap(),
use_schema_registry: info.use_schema_registry,
row_schema_location: info.row_schema_location.clone(),
upsert_primary_key: info.upsert_avro_primary_key.clone(),
..Default::default()
};
if format == SourceFormat::Upsert {
Expand Down
24 changes: 7 additions & 17 deletions src/connector/src/parser/upsert_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ pub struct UpsertParser {
payload_builder: AccessBuilderImpl,
pub(crate) rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
avro_primary_key_column_name: Option<String>,
}

async fn build_accessor_builder(
Expand Down Expand Up @@ -68,31 +67,25 @@ impl UpsertParser {
rw_columns: Vec<SourceColumnDesc>,
source_ctx: SourceContextRef,
) -> Result<Self> {
let mut avro_primary_key_column_name = None;
let key_builder: AccessBuilderImpl;
// check whether columns has `DEFAULT_KEY_COLUMN_NAME`, if so, the key accessor should be
// bytes
if check_rw_default_key(&rw_columns) {
key_builder = AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
EncodingProperties::Bytes(BytesProperties {
let key_builder = if check_rw_default_key(&rw_columns) {
AccessBuilderImpl::Bytes(BytesAccessBuilder::new(EncodingProperties::Bytes(
BytesProperties {
column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()),
}),
)?);
},
))?)
} else {
if let EncodingProperties::Avro(config) = &props.encoding_config {
avro_primary_key_column_name = Some(config.upsert_primary_key.clone())
}
let (key_config, key_type) = extract_key_config!(props);
key_builder = build_accessor_builder(key_config, key_type).await?;
}
build_accessor_builder(key_config, key_type).await?
};
let payload_builder =
build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
Ok(Self {
key_builder,
payload_builder,
rw_columns,
source_ctx,
avro_primary_key_column_name,
})
}

Expand All @@ -113,9 +106,6 @@ impl UpsertParser {
row_op = row_op.with_value(self.payload_builder.generate_accessor(data).await?);
change_event_op = ChangeEventOperation::Upsert;
}
if let Some(primary_key_name) = &self.avro_primary_key_column_name {
row_op = row_op.with_key_as_column_name(primary_key_name);
}

apply_row_operation_on_stream_chunk_writer_with_op(row_op, &mut writer, change_event_op)
.map_err(Into::into)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
- id: create_source_without_schema_in_json
sql: |
create source s with(connector='kafka') FORMAT PLAIN ENCODE JSON;
planner_error: 'Invalid input syntax: schema definition is required for ENCODE JSON'
planner_error: 'Protocol error: Schema definition is required, either from SQL or schema registry.'
- id: csv_delimiter_tab
sql: |
explain create table s0 (v1 int, v2 varchar) with (
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ pub async fn handle_create_sink(
}

/// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`.
/// This is an analogy to (part of) [`crate::handler::create_source::try_bind_columns_from_source`]
/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`]
/// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`.
fn bind_sink_format_desc(value: ConnectorSchema) -> Result<SinkFormatDesc> {
use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat};
Expand Down
Loading

0 comments on commit 28b0ef2

Please sign in to comment.