Skip to content

Commit

Permalink
[source-mssql/mysql/postgres] Fix and cleanup oc map (#42024)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <[email protected]>
Co-authored-by: btkcodedev <[email protected]>
Co-authored-by: Artem Inzhyyants <[email protected]>
Co-authored-by: Natik Gadzhi <[email protected]>
Co-authored-by: artem1205 <[email protected]>
Co-authored-by: Augustin <[email protected]>
Co-authored-by: Antonio Papa <[email protected]>
Co-authored-by: Adam Marcus <[email protected]>
Co-authored-by: Bryce Groff <[email protected]>
Co-authored-by: Akash Kulkarni <[email protected]>
Co-authored-by: Patrick Nilan <[email protected]>
  • Loading branch information
11 people authored Jul 22, 2024
1 parent 9697fa7 commit af28105
Show file tree
Hide file tree
Showing 22 changed files with 318 additions and 313 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.42.3
version=0.42.4
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
Assertions.assertEquals((MODEL_RECORDS.size), recordsFromFirstBatch.size)
assertExpectedRecords(HashSet(MODEL_RECORDS), recordsFromFirstBatch, HashSet())
}

protected open fun validateStreamStateInResumableFullRefresh(streamStateToBeTested: JsonNode) {}

@Test
Expand Down Expand Up @@ -1402,6 +1403,18 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
Assertions.assertEquals(12, recordsFromFirstBatch.size)

stateAfterFirstBatch.map { state -> assertStateDoNotHaveDuplicateStreams(state) }

// Test for recovery - it should be able to resume using any previous state. Using the 3rd
// state to test.
val recoveryState = Jsons.jsonNode(listOf(stateAfterFirstBatch[2]))

val recoverySyncIterator =
source().read(config()!!, fullRefreshConfiguredCatalog, recoveryState)
val dataFromRecoverySync = AutoCloseableIterators.toListAndClose(recoverySyncIterator)
val recordsFromRecoverySync = extractRecordMessages(dataFromRecoverySync)
val stateAfterRecoverySync = extractStateMessages(dataFromRecoverySync)
Assertions.assertEquals(9, stateAfterRecoverySync.size)
Assertions.assertEquals(9, recordsFromRecoverySync.size)
}

protected open fun assertStateMessagesForNewTableSnapshotTest(
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.42.3'
cdkVersionRequired = '0.42.4'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.1
dockerImageTag: 4.1.2
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ protected void initializeForStateManager(final JdbcDatabase database,
final MssqlCursorBasedStateManager cursorBasedStateManager = new MssqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
final InitialLoadStreams initialLoadStreams = streamsForInitialOrderedColumnLoad(cursorBasedStateManager, catalog);
initialLoadStateManager = new MssqlInitialLoadStreamStateManager(catalog, initialLoadStreams,
initPairToOrderedColumnInfoMap(database, initialLoadStreams, tableNameToTable, getQuoteString()));
initPairToOrderedColumnInfoMap(database, catalog, tableNameToTable, getQuoteString()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateManager {

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlInitialLoadGlobalStateManager.class);
private final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo;
private StateManager stateManager;
private final CdcState initialCdcState;
// Only one global state is emitted, which is fanned out into many entries in the DB by platform. As
Expand Down Expand Up @@ -145,11 +144,6 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
.withGlobal(generateGlobalState(streamStates));
}

@Override
public OrderedColumnInfo getOrderedColumnInfo(final AirbyteStreamNameNamespacePair pair) {
return pairToOrderedColInfo.get(pair);
}

private DbStreamState getFinalState(final AirbyteStreamNameNamespacePair pair) {
Preconditions.checkNotNull(pair);
Preconditions.checkNotNull(pair.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public abstract class MssqlInitialLoadStateManager implements SourceStateMessage
public static final String STATE_TYPE_KEY = "state_type";
public static final String ORDERED_COL_STATE_TYPE = "ordered_column";
protected Map<AirbyteStreamNameNamespacePair, OrderedColumnLoadStatus> pairToOrderedColLoadStatus;
protected Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo;

private OrderedColumnLoadStatus ocStatus;

Expand Down Expand Up @@ -61,7 +62,9 @@ public OrderedColumnLoadStatus getOrderedColumnLoadStatus(final AirbyteStreamNam
* @param pair pair
* @return load status
*/
public abstract OrderedColumnInfo getOrderedColumnInfo(final AirbyteStreamNameNamespacePair pair);
public OrderedColumnInfo getOrderedColumnInfo(final AirbyteStreamNameNamespacePair pair) {
return pairToOrderedColInfo.get(pair);
}

static Map<AirbyteStreamNameNamespacePair, OrderedColumnLoadStatus> initPairToOrderedColumnLoadStatusMap(
final Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, OrderedColumnLoadStatus> pairToOcStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
public class MssqlInitialLoadStreamStateManager extends MssqlInitialLoadStateManager {

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlInitialLoadStateManager.class);
private final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo;

public MssqlInitialLoadStreamStateManager(final ConfiguredAirbyteCatalog catalog,
final InitialLoadStreams initialLoadStreams,
Expand Down Expand Up @@ -51,11 +50,6 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
.withStream(getAirbyteStreamState(pair, finalState));
}

@Override
public OrderedColumnInfo getOrderedColumnInfo(final AirbyteStreamNameNamespacePair pair) {
return pairToOrderedColInfo.get(pair);
}

@Override
public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) {
AirbyteStreamNameNamespacePair pair =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public static MssqlInitialLoadGlobalStateManager getMssqlInitialLoadGlobalStateM
final CdcState initialStateToBeUsed = getCdcState(database, catalog, stateManager, savedOffsetStillPresentOnServer);

return new MssqlInitialLoadGlobalStateManager(initialLoadStreams,
initPairToOrderedColumnInfoMap(database, initialLoadStreams, tableNameToTable, quoteString),
initPairToOrderedColumnInfoMap(database, catalog, tableNameToTable, quoteString),
stateManager, catalog, initialStateToBeUsed);
}

Expand Down Expand Up @@ -389,13 +389,13 @@ public static InitialLoadStreams cdcStreamsForInitialOrderedColumnLoad(final Cdc

public static Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> initPairToOrderedColumnInfoMap(
final JdbcDatabase database,
final InitialLoadStreams initialLoadStreams,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
final String quoteString) {
final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOcInfoMap = new HashMap<>();
// For every stream that is in initial ordered column sync, we want to maintain information about
// the current ordered column info associated with the stream
initialLoadStreams.streamsForInitialLoad.forEach(stream -> {
catalog.getStreams().forEach(stream -> {
final AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
final Optional<OrderedColumnInfo> ocInfo = getOrderedColumnInfo(database, stream, tableNameToTable, quoteString);
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.42.3'
cdkVersionRequired = '0.42.4'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.6.2
dockerImageTag: 3.6.3
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ protected void initializeForStateManager(final JdbcDatabase database,
final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog);
initialLoadStateManager =
new MySqlInitialLoadStreamStateManager(catalog, initialLoadStreams,
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, getQuoteString()));
initPairToPrimaryKeyInfoMap(database, catalog, tableNameToTable, getQuoteString()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.InitialLoadStreams;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.PrimaryKeyInfo;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteGlobalState;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
Expand Down Expand Up @@ -156,11 +155,6 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
.withGlobal(generateGlobalState(streamStates));
}

@Override
public PrimaryKeyLoadStatus getPrimaryKeyLoadStatus(final AirbyteStreamNameNamespacePair pair) {
return pairToPrimaryKeyLoadStatus.get(pair);
}

@Override
public PrimaryKeyInfo getPrimaryKeyInfo(final AirbyteStreamNameNamespacePair pair) {
return pairToPrimaryKeyInfo.get(pair);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public void updatePrimaryKeyLoadState(final AirbyteStreamNameNamespacePair pair,

// Returns the previous state emitted, represented as a {@link PrimaryKeyLoadStatus} associated with
// the stream.
public abstract PrimaryKeyLoadStatus getPrimaryKeyLoadStatus(final AirbyteStreamNameNamespacePair pair);
public PrimaryKeyLoadStatus getPrimaryKeyLoadStatus(final AirbyteStreamNameNamespacePair pair) {
return pairToPrimaryKeyLoadStatus.get(pair);
}

// Returns the current {@PrimaryKeyInfo}, associated with the stream. This includes the data type &
// the column name associated with the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.InitialLoadStreams;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.PrimaryKeyInfo;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
Expand Down Expand Up @@ -57,11 +56,6 @@ public PrimaryKeyInfo getPrimaryKeyInfo(final io.airbyte.protocol.models.Airbyte
return pairToPrimaryKeyInfo.get(pair);
}

@Override
public PrimaryKeyLoadStatus getPrimaryKeyLoadStatus(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair) {
return pairToPrimaryKeyLoadStatus.get(pair);
}

@Override
public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) {
AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public static MySqlInitialLoadGlobalStateManager getMySqlInitialLoadGlobalStateM
cdcStreamsForInitialPrimaryKeyLoad(stateManager.getCdcStateManager(), catalog, savedOffsetStillPresentOnServer);

return new MySqlInitialLoadGlobalStateManager(initialLoadStreams,
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, quoteString),
initPairToPrimaryKeyInfoMap(database, catalog, tableNameToTable, quoteString),
stateManager, catalog, savedOffsetStillPresentOnServer, getDefaultCdcState(database, catalog));
}

Expand Down Expand Up @@ -508,14 +508,14 @@ public static List<ConfiguredAirbyteStream> identifyStreamsForCursorBased(final
// currently undergoing initial primary key syncs.
public static Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, PrimaryKeyInfo> initPairToPrimaryKeyInfoMap(
final JdbcDatabase database,
final InitialLoadStreams initialLoadStreams,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<MysqlType>>> tableNameToTable,
final String quoteString) {
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, PrimaryKeyInfo> pairToPkInfoMap = new HashMap<>();
// For every stream that was in primary initial key sync, we want to maintain information about the
// current primary key info associated with the
// stream
initialLoadStreams.streamsForInitialLoad().forEach(stream -> {
catalog.getStreams().forEach(stream -> {
final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair =
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
final Optional<PrimaryKeyInfo> pkInfo = getPrimaryKeyInfo(database, stream, tableNameToTable, quoteString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.42.3'
cdkVersionRequired = '0.42.4'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.6.4
dockerImageTag: 3.6.5
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
Expand All @@ -30,12 +31,13 @@ public abstract class CtidStateManager implements SourceStateMessageProducer<Air
protected final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus;
protected Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;

protected String lastCtid;
protected Map<AirbyteStreamNameNamespacePair, String> pairToLastCtid;
protected FileNodeHandler fileNodeHandler;

protected CtidStateManager(final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus) {
this.pairToCtidStatus = pairToCtidStatus;
this.streamStateForIncrementalRunSupplier = namespacePair -> Jsons.emptyObject();
this.pairToLastCtid = new HashMap<>();
}

public CtidStatus getCtidStatus(final AirbyteStreamNameNamespacePair pair) {
Expand Down Expand Up @@ -81,6 +83,7 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb
protected CtidStatus generateCtidStatusForState(final AirbyteStreamNameNamespacePair pair) {
final Long fileNode = fileNodeHandler.getFileNode(pair);
assert fileNode != null;
final String lastCtid = pairToLastCtid.get(pair);
// If the table is empty, lastCtid will be set to zero for the final state message.
final String lastCtidInState = (Objects.nonNull(lastCtid)
&& StringUtils.isNotBlank(lastCtid)) ? lastCtid : Ctid.ZERO.toString();
Expand All @@ -98,7 +101,9 @@ protected CtidStatus generateCtidStatusForState(final AirbyteStreamNameNamespace
@Override
public AirbyteMessage processRecordMessage(final ConfiguredAirbyteStream stream, AirbyteMessageWithCtid message) {
if (Objects.nonNull(message.ctid())) {
this.lastCtid = message.ctid();
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
stream.getStream().getNamespace());
pairToLastCtid.put(pair, message.ctid());
}
return message.recordMessage();
}
Expand All @@ -121,6 +126,9 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
*/
@Override
public boolean shouldEmitStateMessage(final ConfiguredAirbyteStream stream) {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
stream.getStream().getNamespace());
final String lastCtid = pairToLastCtid.get(pair);
return Objects.nonNull(lastCtid)
&& StringUtils.isNotBlank(lastCtid);
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.2 | 2024-07-22 | [42024](https://github.com/airbytehq/airbyte/pull/42024) | Fix a NPE bug on resuming from a failed attempt. |
| 4.1.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. |
| 4.1.0 | 2024-07-17 | [42078](https://github.com/airbytehq/airbyte/pull/42078) | WASS analytics + bug fixes. |
| 4.0.36 | 2024-07-17 | [41648](https://github.com/airbytehq/airbyte/pull/41648) | Implement WASS. |
Expand Down
Loading

0 comments on commit af28105

Please sign in to comment.