Skip to content

Commit

Permalink
[Source-postgres] : Source operations suport for meta column (#36432)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Mar 26, 2024
1 parent 0f6214a commit 9dee837
Show file tree
Hide file tree
Showing 16 changed files with 179 additions and 107 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.27.6 | 2024-03-26 | [\#36432](https://github.com/airbytehq/airbyte/pull/36432) | Sources support for AirbyteRecordMessageMeta during reading source data types. |
| 0.27.5 | 2024-03-25 | [\#36461](https://github.com/airbytehq/airbyte/pull/36461) | Destinations: Handle case-sensitive columns in destination state handling. |
| 0.27.4 | 2024-03-25 | [\#36333](https://github.com/airbytehq/airbyte/pull/36333) | Sunset DebeziumSourceDecoratingIterator. |
| 0.27.1 | 2024-03-22 | [\#36296](https://github.com/airbytehq/airbyte/pull/36296) | Destinations: (async framework) Do not log invalid message data. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public interface JdbcCompatibleSourceOperations<SourceType> extends SourceOperations<ResultSet, SourceType> {

AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet queryContext) throws SQLException;

/**
* Read from a result set, and copy the value of the column at colIndex to the Json object.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import io.airbyte.cdk.db.DataTypeUtils;
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.PreparedStatement;
Expand All @@ -28,19 +32,48 @@
import java.time.OffsetTime;
import java.time.chrono.IsoEra;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import org.slf4j.LoggerFactory;

/**
* Source operation skeleton for JDBC compatible databases.
*/
public abstract class AbstractJdbcCompatibleSourceOperations<Datatype> implements JdbcCompatibleSourceOperations<Datatype> {

private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcCompatibleSourceOperations.class);

/**
* A Date representing the earliest date in CE. Any date before this is in BCE.
*/
private static final Date ONE_CE = Date.valueOf("0001-01-01");

public AirbyteRecordData convertDatabaseRowToAirbyteRecordData(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
final int columnCount = queryContext.getMetaData().getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
final List<AirbyteRecordMessageMetaChange> metaChanges = new ArrayList<>();

for (int i = 1; i <= columnCount; i++) {
final String columnName = queryContext.getMetaData().getColumnName(i);
try {
// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode);
} catch (Exception e) {
LOGGER.info("Failed to serialize column: {}, with error {}", columnName, e.getMessage());
metaChanges.add(
new AirbyteRecordMessageMetaChange()
.withField(columnName)
.withChange(Change.NULLED)
.withReason(Reason.SOURCE_SERIALIZATION_ERROR));
}
}

return new AirbyteRecordData(jsonNode, new AirbyteRecordMessageMeta().withChanges(metaChanges));
}

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.db.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;

public record AirbyteRecordData(JsonNode rawRowData, AirbyteRecordMessageMeta meta) {}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.27.5
version=0.27.6
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ public interface CdcMetadataInjector<T> {
*/
void addMetaData(ObjectNode event, JsonNode source);

// TODO : Remove this - it is deprecated.
default void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record, final String transactionTimestamp, final T metadataToAdd) {
throw new RuntimeException("Not Supported");
}

default void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record) {
throw new RuntimeException("Not Supported");
}

/**
* As part of Airbyte record we need to add the namespace (schema name)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -36,6 +35,7 @@
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations;
import io.airbyte.cdk.db.SqlDatabase;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.db.jdbc.StreamingJdbcDatabase;
Expand Down Expand Up @@ -104,29 +104,43 @@ public AbstractJdbcSource(final String driverClass,
}

@Override
protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final SyncMode syncMode,
final Optional<String> cursorField) {
protected AutoCloseableIterator<AirbyteRecordData> queryTableFullRefresh(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final SyncMode syncMode,
final Optional<String> cursorField) {
LOGGER.info("Queueing query for table: {}", tableName);
// This corresponds to the initial sync for in INCREMENTAL_MODE, where the ordering of the records
// matters
// as intermediate state messages are emitted (if the connector emits intermediate state).
if (syncMode.equals(SyncMode.INCREMENTAL) && getStateEmissionFrequency() > 0) {
final String quotedCursorField = enquoteIdentifier(cursorField.get(), getQuoteString());
return queryTable(database, String.format("SELECT %s FROM %s ORDER BY %s ASC",
enquoteIdentifierList(columnNames, getQuoteString()),
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()), quotedCursorField),
tableName, schemaName);
} else {
// If we are in FULL_REFRESH mode, state messages are never emitted, so we don't care about ordering
// of the records.
return queryTable(database, String.format("SELECT %s FROM %s",
enquoteIdentifierList(columnNames, getQuoteString()),
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString())), tableName, schemaName);
}
final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair airbyteStream =
AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
final Stream<AirbyteRecordData> stream = database.unsafeQuery(
connection -> {
LOGGER.info("Preparing query for table: {}", tableName);
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());

final String wrappedColumnNames = getWrappedColumnNames(database, connection, columnNames, schemaName, tableName);
final StringBuilder sql = new StringBuilder(String.format("SELECT %s FROM %s",
wrappedColumnNames,
fullTableName));
// if the connector emits intermediate states, the incremental query must be sorted by the cursor
// field
if (syncMode.equals(SyncMode.INCREMENTAL) && getStateEmissionFrequency() > 0) {
final String quotedCursorField = enquoteIdentifier(cursorField.get(), getQuoteString());
sql.append(String.format(" ORDER BY %s ASC", quotedCursorField));
}

final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString());
LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement);
return preparedStatement;
},
sourceOperations::convertDatabaseRowToAirbyteRecordData);
return AutoCloseableIterators.fromStream(stream, airbyteStream);
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}, airbyteStream);
}

/**
Expand Down Expand Up @@ -322,18 +336,18 @@ public boolean isCursorType(final Datatype type) {
}

@Override
public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final CursorInfo cursorInfo,
final Datatype cursorFieldType) {
public AutoCloseableIterator<AirbyteRecordData> queryTableIncremental(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final CursorInfo cursorInfo,
final Datatype cursorFieldType) {
LOGGER.info("Queueing query for table: {}", tableName);
final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair airbyteStream =
AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
final Stream<JsonNode> stream = database.unsafeQuery(
final Stream<AirbyteRecordData> stream = database.unsafeQuery(
connection -> {
LOGGER.info("Preparing query for table: {}", tableName);
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString());
Expand Down Expand Up @@ -370,7 +384,7 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor());
return preparedStatement;
},
sourceOperations::rowToJson);
sourceOperations::convertDatabaseRowToAirbyteRecordData);
return AutoCloseableIterators.fromStream(stream, airbyteStream);
} catch (final SQLException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import datadog.trace.api.Trace;
import io.airbyte.cdk.db.AbstractDatabase;
import io.airbyte.cdk.db.IncrementalUtils;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.JdbcConnector;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
Expand Down Expand Up @@ -43,6 +44,7 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
Expand Down Expand Up @@ -466,7 +468,7 @@ private AutoCloseableIterator<AirbyteMessage> getIncrementalStream(final Databas
table.getFields().stream().anyMatch(f -> f.getName().equals(cursorField)),
String.format("Could not find cursor field %s in table %s", cursorField, table.getName()));

final AutoCloseableIterator<JsonNode> queryIterator = queryTableIncremental(
final AutoCloseableIterator<AirbyteRecordData> queryIterator = queryTableIncremental(
database,
selectedDatabaseFields,
table.getNameSpace(),
Expand Down Expand Up @@ -498,26 +500,31 @@ private AutoCloseableIterator<AirbyteMessage> getFullRefreshStream(final Databas
final Instant emittedAt,
final SyncMode syncMode,
final Optional<String> cursorField) {
final AutoCloseableIterator<JsonNode> queryStream =
final AutoCloseableIterator<AirbyteRecordData> queryStream =
queryTableFullRefresh(database, selectedDatabaseFields, table.getNameSpace(),
table.getName(), syncMode, cursorField);
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
}

private static AutoCloseableIterator<AirbyteMessage> getMessageIterator(
final AutoCloseableIterator<JsonNode> recordIterator,
final AutoCloseableIterator<AirbyteRecordData> recordIterator,
final String streamName,
final String namespace,
final long emittedAt) {
return AutoCloseableIterators.transform(recordIterator,
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(streamName, namespace),
r -> new AirbyteMessage()
airbyteRecordData -> new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(namespace)
.withEmittedAt(emittedAt)
.withData(r)));
.withData(airbyteRecordData.rawRowData())
.withMeta(isMetaChangesEmptyOrNull(airbyteRecordData.meta()) ? null : airbyteRecordData.meta())));
}

private static boolean isMetaChangesEmptyOrNull(AirbyteRecordMessageMeta meta) {
return meta == null || meta.getChanges() == null || meta.getChanges().isEmpty();
}

/**
Expand Down Expand Up @@ -649,12 +656,12 @@ protected abstract Map<String, List<String>> discoverPrimaryKeys(Database databa
* @param syncMode The sync mode that this full refresh stream should be associated with.
* @return iterator with read data
*/
protected abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Database database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final SyncMode syncMode,
final Optional<String> cursorField);
protected abstract AutoCloseableIterator<AirbyteRecordData> queryTableFullRefresh(final Database database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final SyncMode syncMode,
final Optional<String> cursorField);

/**
* Read incremental data from a table. Incremental read should return only records where cursor
Expand All @@ -664,12 +671,12 @@ protected abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final D
*
* @return iterator with read data
*/
protected abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database database,
List<String> columnNames,
String schemaName,
String tableName,
CursorInfo cursorInfo,
DataType cursorFieldType);
protected abstract AutoCloseableIterator<AirbyteRecordData> queryTableIncremental(Database database,
List<String> columnNames,
String schemaName,
String tableName,
CursorInfo cursorInfo,
DataType cursorFieldType);

/**
* When larger than 0, the incremental iterator will emit intermediate state for every N records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.27.4'
cdkVersionRequired = '0.27.6'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Loading

0 comments on commit 9dee837

Please sign in to comment.