Skip to content

Commit

Permalink
fix(cdc): avoid serde the schema field in the Debzium connector (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Nov 2, 2023
1 parent 1211f9d commit 4daf642
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ public class DbzCdcEventConsumer
this.outputChannel = store;
this.heartbeatTopicPrefix = heartbeatTopicPrefix;

var jsonConverter = new JsonConverter();
// The default JSON converter will output the schema field in the JSON which is unnecessary
// to source parser, we use a customized JSON converter to avoid outputting the `schema`
// field.
var jsonConverter = new DbzJsonConverter();
final HashMap<String, Object> configs = new HashMap<>(2);
// only serialize the value part
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
// include record schema
// include record schema to output JSON in { "schema": { ... }, "payload": { ... } } format
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true);
jsonConverter.configure(configs);
this.converter = jsonConverter;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.source.core;

import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.json.JsonConverter;

/**
* Customize the JSON converter to avoid outputting the `schema` field but retian `payload` field in
* the JSON output. e.g.
*
* <pre>
* {
* "schema": null,
* "payload": {
* ...
* }
* }
* </pre>
*/
public class DbzJsonConverter extends JsonConverter {
@Override
public ObjectNode asJsonSchema(Schema schema) {
return null;
}
}

0 comments on commit 4daf642

Please sign in to comment.