From 70afb9d2c748be44726de01b334c0e2022259539 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Tue, 16 Apr 2024 13:41:21 -0700 Subject: [PATCH] [Source-mongodb] : Populate null values in airbyte record message (#37348) --- .../connectors/source-mongodb-v2/build.gradle | 2 +- .../connectors/source-mongodb-v2/metadata.yaml | 2 +- .../source/mongodb/cdc/MongoDbCdcEventUtils.java | 6 ++++++ .../source/mongodb/cdc/MongoDbCdcEventUtilsTest.java | 7 +++++-- docs/integrations/sources/mongodb-v2.md | 1 + 5 files changed, 14 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index 5a744495b8d6..8b27a61300b8 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.29.2' + cdkVersionRequired = '0.30.4' features = ['db-sources', 'datastore-mongo'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 35f5592ff9a5..2f23d76c9574 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -8,7 +8,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.3.3 + dockerImageTag: 1.3.4 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcEventUtils.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcEventUtils.java index 1e9e296a51e5..55245baf0a20 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcEventUtils.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcEventUtils.java @@ -248,6 +248,7 @@ private static ObjectNode readField(final BsonReader reader, case JAVASCRIPT -> o.put(fieldName, reader.readJavaScript()); case JAVASCRIPT_WITH_SCOPE -> readJavaScriptWithScope(o, reader, fieldName); case REGULAR_EXPRESSION -> o.put(fieldName, readRegularExpression(reader.readRegularExpression())); + case NULL -> readNull(o, reader, fieldName); default -> reader.skipValue(); } @@ -289,6 +290,11 @@ private static byte[] toByteArray(final BsonBinary value) { return value == null ? null : value.getData(); } + private static void readNull(final ObjectNode o, final BsonReader reader, final String fieldName) { + o.putNull(fieldName); + reader.readNull(); + } + private static void readJavaScriptWithScope(final ObjectNode o, final BsonReader reader, final String fieldName) { final var code = reader.readJavaScriptWithScope(); final var scope = readDocument(reader, (ObjectNode) Jsons.jsonNode(Collections.emptyMap()), Set.of("scope"), false); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcEventUtilsTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcEventUtilsTest.java index 3068668bb972..5a5b26ce4a8b 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcEventUtilsTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcEventUtilsTest.java @@ -17,6 +17,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.cdk.db.DataTypeUtils; import io.airbyte.commons.json.Jsons; @@ -150,7 +151,8 @@ void testTransformDataTypes() { assertEquals("code2", transformed.get("field13").get("code").asText()); assertEquals("scope", transformed.get("field13").get("scope").get("scope").asText()); assertEquals("pattern", transformed.get("field14").asText()); - assertFalse(transformed.has("field15")); + assertTrue(transformed.has("field15")); + assertEquals(JsonNodeType.NULL, transformed.get("field15").getNodeType()); assertEquals("value", transformed.get("field16").get("key").asText()); // Assert that UUIDs can be serialized. Currently, they will be represented as base 64 encoded // strings. Since the original mongo source @@ -247,7 +249,8 @@ void testTransformDataTypesNoSchema() { assertTrue(abDataNode.has("field12")); assertTrue(abDataNode.has("field13")); assertTrue(abDataNode.has("field14")); - assertFalse(abDataNode.has("field15")); + assertTrue(abDataNode.has("field15")); + assertEquals(JsonNodeType.NULL, abDataNode.get("field15").getNodeType()); assertTrue(abDataNode.has("field16")); } diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index f056bbd26212..7fdf6d2cd6ca 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -221,6 +221,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| +| 1.3.4 | 2024-04-16 | [37348](https://github.com/airbytehq/airbyte/pull/37348) | Populate null values in airbyte record messages. | | 1.3.3 | 2024-04-05 | [36872](https://github.com/airbytehq/airbyte/pull/36872) | Update to connector's metadat definition. | | 1.3.2 | 2024-04-04 | [36845](https://github.com/airbytehq/airbyte/pull/36845) | Adopt Kotlin CDK. | | 1.3.1 | 2024-04-04 | [36837](https://github.com/airbytehq/airbyte/pull/36837) | Adopt CDK 0.28.0. |