From 7f6cac5c6d6f32780b8e919bee1629707597bd65 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 2 Nov 2023 16:37:37 +0800 Subject: [PATCH 1/2] avoid output `schema` field --- .../source/core/DbzCdcEventConsumer.java | 7 +++- .../source/core/DbzJsonConverter.java | 39 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index d1d46d6924204..713edc95f2a74 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -50,11 +50,14 @@ public class DbzCdcEventConsumer this.outputChannel = store; this.heartbeatTopicPrefix = heartbeatTopicPrefix; - var jsonConverter = new JsonConverter(); + // The default JSON converter will output the schema field in the JSON which is unnecessary + // to source parser, we use a customized JSON converter to avoid outputing the `schema` + // field. + var jsonConverter = new DbzJsonConverter(); final HashMap configs = new HashMap<>(2); // only serialize the value part configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()); - // include record schema + // include record schema to output JSON in { "schema": { ... }, "payload": { ... } } format configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true); jsonConverter.configure(configs); this.converter = jsonConverter; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java new file mode 100644 index 0000000000000..3cd2c5c5fb081 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java @@ -0,0 +1,39 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source.core; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.json.JsonConverter; + +/** + * Customize the JSON converter to avoid outputing the `schema` field but retian `payload` field in + * the JSON output. e.g. + * + *
+ * {
+ *     "schema": null,
+ *     "payload": {
+ *     	...
+ *     }
+ * }
+ * 
+ */ +public class DbzJsonConverter extends JsonConverter { + @Override + public ObjectNode asJsonSchema(Schema schema) { + return null; + } +} From c4dc70082a07787a23439fcb5d8c602d0b9d2f36 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 2 Nov 2023 17:00:04 +0800 Subject: [PATCH 2/2] fix typo --- .../risingwave/connector/source/core/DbzCdcEventConsumer.java | 2 +- .../com/risingwave/connector/source/core/DbzJsonConverter.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index 713edc95f2a74..e45fdded17c71 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -51,7 +51,7 @@ public class DbzCdcEventConsumer this.heartbeatTopicPrefix = heartbeatTopicPrefix; // The default JSON converter will output the schema field in the JSON which is unnecessary - // to source parser, we use a customized JSON converter to avoid outputing the `schema` + // to source parser, we use a customized JSON converter to avoid outputting the `schema` // field. var jsonConverter = new DbzJsonConverter(); final HashMap configs = new HashMap<>(2); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java index 3cd2c5c5fb081..11fcdcba55553 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java @@ -19,7 +19,7 @@ import org.apache.kafka.connect.json.JsonConverter; /** - * Customize the JSON converter to avoid outputing the `schema` field but retian `payload` field in + * Customize the JSON converter to avoid outputting the `schema` field but retian `payload` field in * the JSON output. e.g. * *