diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index d1d46d6924204..e45fdded17c71 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -50,11 +50,14 @@ public class DbzCdcEventConsumer this.outputChannel = store; this.heartbeatTopicPrefix = heartbeatTopicPrefix; - var jsonConverter = new JsonConverter(); + // The default JSON converter will output the schema field in the JSON which is unnecessary + // to source parser, we use a customized JSON converter to avoid outputting the `schema` + // field. + var jsonConverter = new DbzJsonConverter(); final HashMap configs = new HashMap<>(2); // only serialize the value part configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()); - // include record schema + // include record schema to output JSON in { "schema": { ... }, "payload": { ... } } format configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true); jsonConverter.configure(configs); this.converter = jsonConverter; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java new file mode 100644 index 0000000000000..11fcdcba55553 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzJsonConverter.java @@ -0,0 +1,39 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source.core; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.json.JsonConverter; + +/** + * Customize the JSON converter to avoid outputting the `schema` field but retian `payload` field in + * the JSON output. e.g. + * + *
+ * {
+ *     "schema": null,
+ *     "payload": {
+ *     	...
+ *     }
+ * }
+ * 
+ */ +public class DbzJsonConverter extends JsonConverter { + @Override + public ObjectNode asJsonSchema(Schema schema) { + return null; + } +}