diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 486498725aa6..cd5cb74face4 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,8 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.35.11 | 2024-05-23 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | This release fixes an error on the previous release. | +| 0.35.10 | 2024-05-23 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | Add shared code for db sources stream status trace messages and testing. | | 0.35.8 | 2024-05-22 | [\#38572](https://github.com/airbytehq/airbyte/pull/38572) | Add a temporary static method to decouple SnowflakeDestination from AbstractJdbcDestination | | 0.35.7 | 2024-05-20 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | Decouple create namespace from per stream operation interface. | | 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt index 08da71007ba9..e32b6f68d517 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt @@ -126,7 +126,7 @@ object AirbyteTraceMessageUtility { ) } - private fun makeStreamStatusTraceAirbyteMessage( + fun makeStreamStatusTraceAirbyteMessage( airbyteStreamStatusHolder: AirbyteStreamStatusHolder ): AirbyteMessage { return makeAirbyteMessageFromTraceMessage(airbyteStreamStatusHolder.toTraceMessage()) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 979d376b3a22..c4ad7d983048 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.35.8 +version=0.35.11 diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index 5fa961038cdb..50ed17d71eec 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -60,6 +60,7 @@ import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.SyncMode +import io.airbyte.protocol.models.v0.SyncMode.* import io.github.oshai.kotlinlogging.KotlinLogging import java.sql.Connection import java.sql.PreparedStatement @@ -126,31 +127,45 @@ abstract class AbstractJdbcSource( syncMode: SyncMode, cursorField: Optional ): AutoCloseableIterator { - if ( - supportResumableFullRefresh(database, airbyteStream) && - syncMode == SyncMode.FULL_REFRESH - ) { + if (supportResumableFullRefresh(database, airbyteStream) && syncMode == FULL_REFRESH) { val initialLoadHandler = getInitialLoadHandler(database, airbyteStream, catalog, stateManager) ?: throw IllegalStateException( "Must provide initialLoadHandler for resumable full refresh." ) - return initialLoadHandler.getIteratorForStream(airbyteStream, table, Instant.now()) + return augmentWithStreamStatus( + airbyteStream, + initialLoadHandler.getIteratorForStream(airbyteStream, table, Instant.now()) + ) } // If flag is off, fall back to legacy non-resumable refresh - return super.getFullRefreshStream( - database, - airbyteStream, - catalog, - stateManager, - namespace, - selectedDatabaseFields, - table, - emittedAt, - syncMode, - cursorField, - ) + var iterator = + super.getFullRefreshStream( + database, + airbyteStream, + catalog, + stateManager, + namespace, + selectedDatabaseFields, + table, + emittedAt, + syncMode, + cursorField, + ) + + return when (airbyteStream.syncMode) { + FULL_REFRESH -> augmentWithStreamStatus(airbyteStream, iterator) + else -> iterator + } + } + + open fun augmentWithStreamStatus( + airbyteStream: ConfiguredAirbyteStream, + streamItrator: AutoCloseableIterator + ): AutoCloseableIterator { + // no-op + return streamItrator } override fun queryTableFullRefresh( @@ -192,9 +207,7 @@ abstract class AbstractJdbcSource( // if the connector emits intermediate states, the incremental query // must be sorted by the cursor // field - if ( - syncMode == SyncMode.INCREMENTAL && stateEmissionFrequency > 0 - ) { + if (syncMode == INCREMENTAL && stateEmissionFrequency > 0) { val quotedCursorField: String = enquoteIdentifier(cursorField.get(), quoteString) sql.append(" ORDER BY $quotedCursorField ASC") @@ -704,7 +717,7 @@ abstract class AbstractJdbcSource( HashSet(Sets.difference(allStreams, alreadySyncedStreams)) return catalog.streams - .filter { c: ConfiguredAirbyteStream -> c.syncMode == SyncMode.INCREMENTAL } + .filter { c: ConfiguredAirbyteStream -> c.syncMode == INCREMENTAL } .filter { stream: ConfiguredAirbyteStream -> newlyAddedStreams.contains( AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.stream) @@ -737,4 +750,27 @@ abstract class AbstractJdbcSource( return result } } + + override fun createReadIterator( + database: JdbcDatabase, + airbyteStream: ConfiguredAirbyteStream, + catalog: ConfiguredAirbyteCatalog?, + table: TableInfo>, + stateManager: StateManager?, + emittedAt: Instant + ): AutoCloseableIterator { + val iterator = + super.createReadIterator( + database, + airbyteStream, + catalog, + table, + stateManager, + emittedAt + ) + return when (airbyteStream.syncMode) { + INCREMENTAL -> augmentWithStreamStatus(airbyteStream, iterator) + else -> iterator + } + } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt index 06f5a99ab440..1e67982a5e94 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt @@ -401,7 +401,7 @@ protected constructor(driverClassName: String) : * @param emittedAt Time when data was emitted from the Source database * @return */ - private fun createReadIterator( + protected open fun createReadIterator( database: Database, airbyteStream: ConfiguredAirbyteStream, catalog: ConfiguredAirbyteCatalog?, diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/streamstatus/StreamStatusTraceEmitterIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/streamstatus/StreamStatusTraceEmitterIterator.kt new file mode 100644 index 000000000000..8c96606bbcf6 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/streamstatus/StreamStatusTraceEmitterIterator.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.source.relationaldb.streamstatus + +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage +import io.airbyte.commons.stream.AirbyteStreamStatusHolder +import io.airbyte.commons.util.AutoCloseableIterator +import io.airbyte.protocol.models.v0.AirbyteMessage +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class StreamStatusTraceEmitterIterator(private val statusHolder: AirbyteStreamStatusHolder) : + AutoCloseableIterator { + private var emitted = false + + override fun hasNext(): Boolean { + return !emitted + } + + override fun next(): AirbyteMessage { + emitted = true + return makeStreamStatusTraceAirbyteMessage(statusHolder) + } + + @Throws(Exception::class) + override fun close() { + // no-op + } + + override fun remove() { + // no-op + } + + companion object { + private val LOGGER: Logger = + LoggerFactory.getLogger(StreamStatusTraceEmitterIterator::class.java) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt index 4f4dc9a40106..dc8086bafbbe 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt @@ -18,9 +18,20 @@ import io.airbyte.cdk.integrations.util.HostPortResolver.resolveHost import io.airbyte.cdk.integrations.util.HostPortResolver.resolvePort import io.airbyte.cdk.testutils.TestDatabase import io.airbyte.commons.json.Jsons +import io.airbyte.commons.util.MoreIterators +import io.airbyte.protocol.models.Field +import io.airbyte.protocol.models.JsonSchemaType +import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateMessage +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage +import io.airbyte.protocol.models.v0.CatalogHelpers +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.airbyte.protocol.models.v0.SyncMode import io.github.oshai.kotlinlogging.KotlinLogging import java.sql.JDBCType +import java.util.function.Consumer import java.util.function.Supplier import java.util.stream.Stream import org.jooq.SQLDialect @@ -196,6 +207,134 @@ internal class DefaultJdbcSourceAcceptanceTest : } } + @Throws(Exception::class) + override fun incrementalCursorCheck( + initialCursorField: String?, + cursorField: String, + initialCursorValue: String?, + endCursorValue: String?, + expectedRecordMessages: List, + airbyteStream: ConfiguredAirbyteStream + ) { + airbyteStream.syncMode = SyncMode.INCREMENTAL + airbyteStream.cursorField = java.util.List.of(cursorField) + airbyteStream.destinationSyncMode = DestinationSyncMode.APPEND + + val configuredCatalog = + ConfiguredAirbyteCatalog().withStreams(java.util.List.of(airbyteStream)) + + val dbStreamState = buildStreamState(airbyteStream, initialCursorField, initialCursorValue) + + val actualMessages = + MoreIterators.toList( + source()!!.read( + config(), + configuredCatalog, + Jsons.jsonNode(createState(java.util.List.of(dbStreamState))), + ), + ) + + setEmittedAtToNull(actualMessages) + + val expectedStreams = + java.util.List.of(buildStreamState(airbyteStream, cursorField, endCursorValue)) + + val expectedMessages: MutableList = ArrayList(expectedRecordMessages) + expectedMessages.addAll( + createExpectedTestMessages(expectedStreams, expectedRecordMessages.size.toLong()), + ) + + setTraceEmittedAtToNull(actualMessages) + setTraceEmittedAtToNull(expectedMessages) + Assertions.assertEquals(expectedMessages.size, actualMessages.size) + Assertions.assertTrue(expectedMessages.containsAll(actualMessages)) + Assertions.assertTrue(actualMessages.containsAll(expectedMessages)) + } + + override open fun assertStreamStatusTraceMessageIndex( + idx: Int, + allMessages: List, + expectedStreamStatus: AirbyteStreamStatusTraceMessage + ) { + // no-op + } + + @Test + @Throws(Exception::class) + override fun testReadOneColumn() { + val catalog = + CatalogHelpers.createConfiguredAirbyteCatalog( + streamName(), + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.NUMBER), + ) + val actualMessages = MoreIterators.toList(source().read(config(), catalog, null)) + + setEmittedAtToNull(actualMessages) + + val expectedMessages: MutableList = airbyteMessagesReadOneColumn + + Assertions.assertEquals(expectedMessages.size, actualMessages.size) + Assertions.assertTrue(expectedMessages.containsAll(actualMessages)) + Assertions.assertTrue(actualMessages.containsAll(expectedMessages)) + } + + @Test + @Throws(Exception::class) + override fun testReadOneTableIncrementallyTwice() { + val config = config() + val namespace = defaultNamespace + val configuredCatalog = getConfiguredCatalogWithOneStream(namespace) + configuredCatalog.streams.forEach( + Consumer { airbyteStream: ConfiguredAirbyteStream -> + airbyteStream.syncMode = SyncMode.INCREMENTAL + airbyteStream.cursorField = java.util.List.of(COL_ID) + airbyteStream.destinationSyncMode = DestinationSyncMode.APPEND + }, + ) + + val actualMessagesFirstSync = + MoreIterators.toList( + source()!!.read( + config, + configuredCatalog, + createEmptyState(streamName(), namespace), + ), + ) + + val stateAfterFirstSyncOptional = + actualMessagesFirstSync + .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } + .first() + + executeStatementReadIncrementallyTwice() + + val actualMessagesSecondSync = + MoreIterators.toList( + source()!!.read( + config, + configuredCatalog, + extractState(stateAfterFirstSyncOptional), + ), + ) + + Assertions.assertEquals( + 2, + actualMessagesSecondSync + .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } + .count() + .toInt(), + ) + val expectedMessages: MutableList = + getExpectedAirbyteMessagesSecondSync(namespace) + + setEmittedAtToNull(actualMessagesSecondSync) + + Assertions.assertEquals(expectedMessages.size, actualMessagesSecondSync.size) + Assertions.assertTrue(expectedMessages.containsAll(actualMessagesSecondSync)) + Assertions.assertTrue(actualMessagesSecondSync.containsAll(expectedMessages)) + } + companion object { private lateinit var PSQL_CONTAINER: PostgreSQLContainer<*> diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index af5dd37b7a1f..b8a14658b961 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -105,6 +105,29 @@ abstract class CdcSourceTest> { protected abstract fun assertExpectedStateMessages(stateMessages: List) + protected open fun assertStreamStatusTraceMessageIndex( + idx: Int, + allMessages: List, + expectedStreamStatus: AirbyteStreamStatusTraceMessage + ) { + var actualMessage = allMessages[idx] + Assertions.assertEquals(AirbyteMessage.Type.TRACE, actualMessage.type) + var traceMessage = actualMessage.trace + Assertions.assertNotNull(traceMessage.streamStatus) + Assertions.assertEquals(expectedStreamStatus, traceMessage.streamStatus) + } + + private fun createAirbteStreanStatusTraceMessage( + namespace: String, + streamName: String, + status: AirbyteStreamStatusTraceMessage.AirbyteStreamStatus + ): AirbyteStreamStatusTraceMessage { + + return AirbyteStreamStatusTraceMessage() + .withStreamDescriptor(StreamDescriptor().withNamespace(namespace).withName(streamName)) + .withStatus(status) + } + protected open fun assertExpectedStateMessagesForFullRefresh( stateMessages: List ) {} @@ -309,9 +332,7 @@ abstract class CdcSourceTest> { } protected fun extractStateMessages(messages: List): List { - return messages - .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } - .map { obj: AirbyteMessage -> obj.state } + return messages.filter { it.type == AirbyteMessage.Type.STATE }.map { it.state }.toList() } protected fun assertExpectedRecords( @@ -381,6 +402,25 @@ abstract class CdcSourceTest> { val recordMessages = extractRecordMessages(actualRecords) val stateMessages = extractStateMessages(actualRecords) + assertStreamStatusTraceMessageIndex( + 0, + actualRecords, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED + ) + ) + assertStreamStatusTraceMessageIndex( + actualRecords.size - 1, + actualRecords, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + ) + Assertions.assertNotNull(targetPosition) recordMessages.forEach( Consumer { record: AirbyteRecordMessage -> @@ -442,6 +482,25 @@ abstract class CdcSourceTest> { val stateMessages1 = extractStateMessages(actualRecords1) assertExpectedStateMessages(stateMessages1) + assertStreamStatusTraceMessageIndex( + 0, + actualRecords1, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED + ) + ) + assertStreamStatusTraceMessageIndex( + actualRecords1.size - 1, + actualRecords1, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + ) + updateCommand(MODELS_STREAM_NAME, COL_MODEL, updatedModel, COL_ID, 11) waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, 1) @@ -613,10 +672,33 @@ abstract class CdcSourceTest> { configuredCatalog.withStreams(streams) val read1 = source().read(config()!!, configuredCatalog, null) - val actualRecords1 = AutoCloseableIterators.toListAndClose(read1) + val actualMessages1 = AutoCloseableIterators.toListAndClose(read1) + + // The first message will be start of the incremental stream. + // The last message will be the end of the full refresh stream. + // Index start of the incremental stream will be depending on if connector supports + // resumeable full refresh. + assertStreamStatusTraceMessageIndex( + 0, + actualMessages1, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED + ) + ) + assertStreamStatusTraceMessageIndex( + actualMessages1.size - 1, + actualMessages1, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME_2, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + ) - val recordMessages1 = extractRecordMessages(actualRecords1) - val stateMessages1 = extractStateMessages(actualRecords1) + val recordMessages1 = extractRecordMessages(actualMessages1) + val stateMessages1 = extractStateMessages(actualMessages1) stateMessages1.map { state -> assertStateDoNotHaveDuplicateStreams(state) } val names = HashSet(STREAM_NAMES) names.add(MODELS_STREAM_NAME_2) @@ -638,6 +720,25 @@ abstract class CdcSourceTest> { modelsSchema(), ) + assertStreamStatusTraceMessageIndex( + MODEL_RECORDS_2.size, + actualMessages1, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME_2, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + ) + assertStreamStatusTraceMessageIndex( + MODEL_RECORDS_2.size + 1, + actualMessages1, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED + ) + ) + val state = Jsons.jsonNode(listOf(stateMessages1[stateMessages1.size - 1])) val read2 = source().read(config()!!, configuredCatalog, state) val actualRecords2 = AutoCloseableIterators.toListAndClose(read2) @@ -655,6 +756,28 @@ abstract class CdcSourceTest> { modelsSchema(), ) } else { + assertExpectedStateMessageCountMatches(stateMessages1, MODEL_RECORDS.size.toLong()) + + // Expect state and record message from MODEL_RECORDS_2. + assertStreamStatusTraceMessageIndex( + 2 * MODEL_RECORDS_2.size + 2, + actualMessages1, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + ) + assertStreamStatusTraceMessageIndex( + 2 * MODEL_RECORDS_2.size + 3, + actualMessages1, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME_2, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED + ) + ) + assertExpectedRecords( (MODEL_RECORDS_2 + MODEL_RECORDS).toSet(), recordMessages1, @@ -676,7 +799,7 @@ abstract class CdcSourceTest> { val stateMessages2 = extractStateMessages(actualRecords2) assertExpectedRecords( - (MODEL_RECORDS_2 + puntoRecord).toSet(), + (MODEL_RECORDS_2 + listOf(puntoRecord)).toSet(), recordMessages2, setOf(MODELS_STREAM_NAME), names, @@ -795,7 +918,7 @@ abstract class CdcSourceTest> { assertExpectedStateMessagesFromIncrementalSync(stateMessages2) assertExpectedStateMessageCountMatches(stateMessages2, 1) assertExpectedRecords( - (MODEL_RECORDS_2 + puntoRecord).toSet(), + (MODEL_RECORDS_2 + listOf(puntoRecord)).toSet(), recordMessages2, setOf(MODELS_STREAM_NAME), names, @@ -830,6 +953,25 @@ abstract class CdcSourceTest> { assertExpectedRecords(emptySet(), recordMessages) assertExpectedStateMessagesForNoData(stateMessages) assertExpectedStateMessageCountMatches(stateMessages, 0) + + assertStreamStatusTraceMessageIndex( + 0, + actualRecords, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED + ) + ) + assertStreamStatusTraceMessageIndex( + actualRecords.size - 1, + actualRecords, + createAirbteStreanStatusTraceMessage( + modelsSchema(), + MODELS_STREAM_NAME, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + ) } protected open fun assertExpectedStateMessagesForNoData( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt index f3c2b39cd3e7..24f400a82107 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.db.factory.DatabaseDriver import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility import io.airbyte.cdk.integrations.base.Source import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils import io.airbyte.cdk.integrations.source.relationaldb.models.DbState @@ -15,10 +16,13 @@ import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState import io.airbyte.cdk.testutils.TestDatabase import io.airbyte.commons.json.Jsons import io.airbyte.commons.resources.MoreResources +import io.airbyte.commons.stream.AirbyteStreamStatusHolder import io.airbyte.commons.util.MoreIterators +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair import io.airbyte.protocol.models.Field import io.airbyte.protocol.models.JsonSchemaType import io.airbyte.protocol.models.v0.* +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.* import java.math.BigDecimal import java.sql.SQLException import java.util.* @@ -36,7 +40,7 @@ import org.mockito.Mockito @SuppressFBWarnings( value = ["MS_SHOULD_BE_FINAL"], justification = - "The static variables are updated in subclasses for convenience, and cannot be final." + "The static variables are updated in subclasses for convenience, and cannot be final.", ) abstract class JdbcSourceAcceptanceTest> { @JvmField protected var testdb: T = createTestDatabase() @@ -87,7 +91,7 @@ abstract class JdbcSourceAcceptanceTest> { tableName, columnClause, if (primaryKeyClause == "") "" else ",", - primaryKeyClause + primaryKeyClause, ) } @@ -123,58 +127,58 @@ abstract class JdbcSourceAcceptanceTest> { createTableQuery( getFullyQualifiedTableName(TABLE_NAME), COLUMN_CLAUSE_WITH_PK, - primaryKeyClause(listOf("id")) - ) + primaryKeyClause(listOf("id")), + ), ) ?.with( "INSERT INTO %s(id, name, updated_at) VALUES (1, 'picard', '2004-10-19')", - getFullyQualifiedTableName(TABLE_NAME) + getFullyQualifiedTableName(TABLE_NAME), ) ?.with( "INSERT INTO %s(id, name, updated_at) VALUES (2, 'crusher', '2005-10-19')", - getFullyQualifiedTableName(TABLE_NAME) + getFullyQualifiedTableName(TABLE_NAME), ) ?.with( "INSERT INTO %s(id, name, updated_at) VALUES (3, 'vash', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME) + getFullyQualifiedTableName(TABLE_NAME), ) ?.with( createTableQuery( getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK), COLUMN_CLAUSE_WITHOUT_PK, - "" - ) + "", + ), ) ?.with( "INSERT INTO %s(id, name, updated_at) VALUES (1, 'picard', '2004-10-19')", - getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK) + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK), ) ?.with( "INSERT INTO %s(id, name, updated_at) VALUES (2, 'crusher', '2005-10-19')", - getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK) + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK), ) ?.with( "INSERT INTO %s(id, name, updated_at) VALUES (3, 'vash', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK) + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK), ) ?.with( createTableQuery( getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK), COLUMN_CLAUSE_WITH_COMPOSITE_PK, - primaryKeyClause(listOf("first_name", "last_name")) - ) + primaryKeyClause(listOf("first_name", "last_name")), + ), ) ?.with( "INSERT INTO %s(first_name, last_name, updated_at) VALUES ('first', 'picard', '2004-10-19')", - getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK) + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK), ) ?.with( "INSERT INTO %s(first_name, last_name, updated_at) VALUES ('second', 'crusher', '2005-10-19')", - getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK) + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK), ) ?.with( "INSERT INTO %s(first_name, last_name, updated_at) VALUES ('third', 'vash', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK) + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK), ) } @@ -183,6 +187,28 @@ abstract class JdbcSourceAcceptanceTest> { // timeout. } + protected open fun assertStreamStatusTraceMessageIndex( + idx: Int, + allMessages: List, + expectedStreamStatus: AirbyteStreamStatusTraceMessage + ) { + var actualMessage = allMessages[idx] + Assertions.assertEquals(AirbyteMessage.Type.TRACE, actualMessage.type) + var traceMessage = actualMessage.trace + Assertions.assertNotNull(traceMessage.streamStatus) + Assertions.assertEquals(expectedStreamStatus, traceMessage.streamStatus) + } + + fun createAirbteStreanStatusTraceMessage( + namespace: String, + streamName: String, + status: AirbyteStreamStatus + ): AirbyteStreamStatusTraceMessage { + return AirbyteStreamStatusTraceMessage() + .withStreamDescriptor(StreamDescriptor().withNamespace(namespace).withName(streamName)) + .withStatus(status) + } + @AfterEach fun tearDown() { testdb.close() @@ -232,7 +258,7 @@ abstract class JdbcSourceAcceptanceTest> { } Assertions.assertTrue( expectedStream != null, - String.format("Unexpected stream %s", actualStream.name) + String.format("Unexpected stream %s", actualStream.name), ) Assertions.assertEquals(expectedStream, actualStream) } @@ -246,11 +272,11 @@ abstract class JdbcSourceAcceptanceTest> { .with( CREATE_TABLE_WITHOUT_CURSOR_TYPE_QUERY, getFullyQualifiedTableName(TABLE_NAME_WITHOUT_CURSOR_TYPE), - COL_CURSOR + COL_CURSOR, ) .with( INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY, - getFullyQualifiedTableName(TABLE_NAME_WITHOUT_CURSOR_TYPE) + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_CURSOR_TYPE), ) val actual = filterOutOtherSchemas(source().discover(config())) val stream = @@ -259,7 +285,7 @@ abstract class JdbcSourceAcceptanceTest> { } Assertions.assertEquals( TABLE_NAME_WITHOUT_CURSOR_TYPE.lowercase(Locale.getDefault()), - stream.name.lowercase(Locale.getDefault()) + stream.name.lowercase(Locale.getDefault()), ) Assertions.assertEquals(1, stream.supportedSyncModes.size) Assertions.assertEquals(SyncMode.FULL_REFRESH, stream.supportedSyncModes[0]) @@ -272,11 +298,11 @@ abstract class JdbcSourceAcceptanceTest> { .with( CREATE_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY, getFullyQualifiedTableName(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE), - COL_CURSOR + COL_CURSOR, ) .with( INSERT_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY, - getFullyQualifiedTableName(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE) + getFullyQualifiedTableName(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE), ) val actual = filterOutOtherSchemas(source().discover(config())) val stream = @@ -287,7 +313,7 @@ abstract class JdbcSourceAcceptanceTest> { .first() Assertions.assertEquals( TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE.lowercase(Locale.getDefault()), - stream.name.lowercase(Locale.getDefault()) + stream.name.lowercase(Locale.getDefault()), ) Assertions.assertEquals(2, stream.supportedSyncModes.size) Assertions.assertTrue(stream.supportedSyncModes.contains(SyncMode.FULL_REFRESH)) @@ -324,19 +350,19 @@ abstract class JdbcSourceAcceptanceTest> { testdb .with( "CREATE TABLE %s(id VARCHAR(200) NOT NULL, name VARCHAR(200) NOT NULL)", - RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME) + RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME), ) .with( "INSERT INTO %s(id, name) VALUES ('1','picard')", - RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME) + RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME), ) .with( "INSERT INTO %s(id, name) VALUES ('2', 'crusher')", - RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME) + RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME), ) .with( "INSERT INTO %s(id, name) VALUES ('3', 'vash')", - RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME) + RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME), ) val actual = source().discover(config()) @@ -349,11 +375,11 @@ abstract class JdbcSourceAcceptanceTest> { TABLE_NAME, SCHEMA_NAME2, Field.of(COL_ID, JsonSchemaType.STRING), - Field.of(COL_NAME, JsonSchemaType.STRING) + Field.of(COL_NAME, JsonSchemaType.STRING), ) .withSupportedSyncModes( - java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL) - ) + java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), + ), ) expected.streams = catalogStreams // sort streams by name so that we are comparing lists with the same order. @@ -376,11 +402,11 @@ abstract class JdbcSourceAcceptanceTest> { MatcherAssert.assertThat( expectedMessagesResult, - Matchers.containsInAnyOrder(*actualRecordMessages.toTypedArray()) + Matchers.containsInAnyOrder(*actualRecordMessages.toTypedArray()), ) MatcherAssert.assertThat( actualRecordMessages, - Matchers.containsInAnyOrder(*expectedMessagesResult.toTypedArray()) + Matchers.containsInAnyOrder(*expectedMessagesResult.toTypedArray()), ) } @@ -391,25 +417,64 @@ abstract class JdbcSourceAcceptanceTest> { @Test @Throws(Exception::class) - protected fun testReadOneColumn() { + protected open fun testReadOneColumn() { val catalog = CatalogHelpers.createConfiguredAirbyteCatalog( streamName(), defaultNamespace, - Field.of(COL_ID, JsonSchemaType.NUMBER) + Field.of(COL_ID, JsonSchemaType.NUMBER), ) val actualMessages = MoreIterators.toList(source().read(config(), catalog, null)) + assertStreamStatusTraceMessageIndex( + 0, + actualMessages, + createAirbteStreanStatusTraceMessage( + defaultNamespace, + streamName(), + AirbyteStreamStatus.STARTED + ) + ) + assertStreamStatusTraceMessageIndex( + actualMessages.size - 1, + actualMessages, + createAirbteStreanStatusTraceMessage( + defaultNamespace, + streamName(), + AirbyteStreamStatus.COMPLETE + ) + ) + setEmittedAtToNull(actualMessages) - val expectedMessages = airbyteMessagesReadOneColumn - val actualRecordMessages = filterRecords(actualMessages) - Assertions.assertEquals(expectedMessages.size, actualRecordMessages.size) - Assertions.assertTrue(expectedMessages.containsAll(actualRecordMessages)) - Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages)) + val expectedMessages: MutableList = airbyteMessagesReadOneColumn + + expectedMessages.addFirst( + AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage( + AirbyteStreamStatusHolder( + AirbyteStreamNameNamespacePair(streamName(), defaultNamespace), + AirbyteStreamStatus.STARTED + ) + ) + ) + + expectedMessages.addLast( + AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage( + AirbyteStreamStatusHolder( + AirbyteStreamNameNamespacePair(streamName(), defaultNamespace), + AirbyteStreamStatus.COMPLETE + ) + ) + ) + setTraceEmittedAtToNull(actualMessages) + setTraceEmittedAtToNull(expectedMessages) + + Assertions.assertEquals(expectedMessages.size, actualMessages.size) + Assertions.assertTrue(expectedMessages.containsAll(actualMessages)) + Assertions.assertTrue(actualMessages.containsAll(expectedMessages)) } - protected open val airbyteMessagesReadOneColumn: List + protected open val airbyteMessagesReadOneColumn: MutableList get() { val expectedMessages = testMessages @@ -419,9 +484,10 @@ abstract class JdbcSourceAcceptanceTest> { (m.record.data as ObjectNode).remove(COL_UPDATED_AT) (m.record.data as ObjectNode).replace( COL_ID, - convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()) + convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()), ) } + .toMutableList() return expectedMessages } @@ -444,8 +510,8 @@ abstract class JdbcSourceAcceptanceTest> { streamName2, defaultNamespace, Field.of(COL_ID, JsonSchemaType.NUMBER), - Field.of(COL_NAME, JsonSchemaType.STRING) - ) + Field.of(COL_NAME, JsonSchemaType.STRING), + ), ) expectedMessages.addAll(getAirbyteMessagesSecondSync(streamName2)) @@ -480,7 +546,7 @@ abstract class JdbcSourceAcceptanceTest> { streamName2, defaultNamespace, Field.of(COL_ID, JsonSchemaType.NUMBER), - Field.of(COL_NAME, JsonSchemaType.STRING) + Field.of(COL_NAME, JsonSchemaType.STRING), ) airbyteStream2.syncMode = SyncMode.INCREMENTAL airbyteStream2.cursorField = java.util.List.of(COL_ID) @@ -490,6 +556,44 @@ abstract class JdbcSourceAcceptanceTest> { expectedMessages.addAll(getAirbyteMessagesSecondSync(streamName2)) val actualMessages = MoreIterators.toList(source()!!.read(config(), catalog, null)) + + assertStreamStatusTraceMessageIndex( + 0, + actualMessages, + createAirbteStreanStatusTraceMessage( + defaultNamespace, + streamName2, + AirbyteStreamStatus.STARTED + ) + ) + assertStreamStatusTraceMessageIndex( + actualMessages.size - 7, + actualMessages, + createAirbteStreanStatusTraceMessage( + defaultNamespace, + streamName2, + AirbyteStreamStatus.COMPLETE + ) + ) + assertStreamStatusTraceMessageIndex( + actualMessages.size - 6, + actualMessages, + createAirbteStreanStatusTraceMessage( + defaultNamespace, + streamName(), + AirbyteStreamStatus.STARTED + ) + ) + assertStreamStatusTraceMessageIndex( + actualMessages.size - 1, + actualMessages, + createAirbteStreanStatusTraceMessage( + defaultNamespace, + streamName(), + AirbyteStreamStatus.COMPLETE + ) + ) + val actualRecordMessages = filterRecords(actualMessages) setEmittedAtToNull(actualMessages) @@ -508,7 +612,7 @@ abstract class JdbcSourceAcceptanceTest> { (m.record.data as ObjectNode).remove(COL_UPDATED_AT) (m.record.data as ObjectNode).replace( COL_ID, - convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()) + convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()), ) } } @@ -523,8 +627,8 @@ abstract class JdbcSourceAcceptanceTest> { .withStreams( java.util.List.of( getConfiguredCatalogWithOneStream(defaultNamespace).streams[0], - streamForTableWithSpaces - ) + streamForTableWithSpaces, + ), ) val actualMessages = MoreIterators.toList(source().read(config(), catalog, null)) val actualRecordMessages = filterRecords(actualMessages) @@ -548,7 +652,7 @@ abstract class JdbcSourceAcceptanceTest> { .withStreams( java.util.List.of( getConfiguredCatalogWithOneStream(defaultNamespace).streams[0], - ) + ), ) val actualMessages = MoreIterators.toList(source()!!.read(config(), catalog, null)) val actualRecordMessages = filterRecords(actualMessages) @@ -574,12 +678,12 @@ abstract class JdbcSourceAcceptanceTest> { m.record.stream = streamForTableWithSpaces.stream.name (m.record.data as ObjectNode).set( COL_LAST_NAME_WITH_SPACE, - (m.record.data as ObjectNode).remove(COL_NAME) + (m.record.data as ObjectNode).remove(COL_NAME), ) (m.record.data as ObjectNode).remove(COL_UPDATED_AT) (m.record.data as ObjectNode).replace( COL_ID, - convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()) + convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()), ) } } @@ -615,7 +719,7 @@ abstract class JdbcSourceAcceptanceTest> { COL_NAME, "patent", "vash", - java.util.List.of(testMessages[0], testMessages[2]) + java.util.List.of(testMessages[0], testMessages[2]), ) } @@ -632,7 +736,7 @@ abstract class JdbcSourceAcceptanceTest> { "patent", "vash", expectedRecordMessages, - streamWithSpaces + streamWithSpaces, ) } @@ -644,7 +748,7 @@ abstract class JdbcSourceAcceptanceTest> { (firstMessage.record.data as ObjectNode).remove(COL_UPDATED_AT) (firstMessage.record.data as ObjectNode).set( COL_LAST_NAME_WITH_SPACE, - (firstMessage.record.data as ObjectNode).remove(COL_NAME) + (firstMessage.record.data as ObjectNode).remove(COL_NAME), ) val secondMessage = testMessages[2] @@ -652,7 +756,7 @@ abstract class JdbcSourceAcceptanceTest> { (secondMessage.record.data as ObjectNode).remove(COL_UPDATED_AT) (secondMessage.record.data as ObjectNode).set( COL_LAST_NAME_WITH_SPACE, - (secondMessage.record.data as ObjectNode).remove(COL_NAME) + (secondMessage.record.data as ObjectNode).remove(COL_NAME), ) return java.util.List.of(firstMessage, secondMessage) @@ -670,7 +774,7 @@ abstract class JdbcSourceAcceptanceTest> { COL_UPDATED_AT, "2005-10-18", "2006-10-19", - java.util.List.of(testMessages[1], testMessages[2]) + java.util.List.of(testMessages[1], testMessages[2]), ) } @@ -686,13 +790,13 @@ abstract class JdbcSourceAcceptanceTest> { // records to (incorrectly) be filtered out. "data", "vash", - testMessages + testMessages, ) } @Test @Throws(Exception::class) - protected fun testReadOneTableIncrementallyTwice() { + protected open fun testReadOneTableIncrementallyTwice() { val config = config() val namespace = defaultNamespace val configuredCatalog = getConfiguredCatalogWithOneStream(namespace) @@ -701,14 +805,37 @@ abstract class JdbcSourceAcceptanceTest> { airbyteStream.syncMode = SyncMode.INCREMENTAL airbyteStream.cursorField = java.util.List.of(COL_ID) airbyteStream.destinationSyncMode = DestinationSyncMode.APPEND - } + }, ) val actualMessagesFirstSync = MoreIterators.toList( - source().read(config, configuredCatalog, createEmptyState(streamName(), namespace)) + source()!!.read( + config, + configuredCatalog, + createEmptyState(streamName(), namespace), + ), ) + assertStreamStatusTraceMessageIndex( + 0, + actualMessagesFirstSync, + createAirbteStreanStatusTraceMessage( + defaultNamespace, + streamName(), + AirbyteStreamStatus.STARTED + ) + ) + assertStreamStatusTraceMessageIndex( + actualMessagesFirstSync.size - 1, + actualMessagesFirstSync, + createAirbteStreanStatusTraceMessage( + defaultNamespace, + streamName(), + AirbyteStreamStatus.COMPLETE + ) + ) + val stateAfterFirstSyncOptional = actualMessagesFirstSync .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } @@ -718,20 +845,69 @@ abstract class JdbcSourceAcceptanceTest> { val actualMessagesSecondSync = MoreIterators.toList( - source().read(config, configuredCatalog, extractState(stateAfterFirstSyncOptional)) + source()!!.read( + config, + configuredCatalog, + extractState(stateAfterFirstSyncOptional), + ), + ) + + assertStreamStatusTraceMessageIndex( + 0, + actualMessagesSecondSync, + createAirbteStreanStatusTraceMessage( + defaultNamespace, + streamName(), + AirbyteStreamStatus.STARTED + ) + ) + assertStreamStatusTraceMessageIndex( + actualMessagesSecondSync.size - 1, + actualMessagesSecondSync, + createAirbteStreanStatusTraceMessage( + defaultNamespace, + streamName(), + AirbyteStreamStatus.COMPLETE ) + ) Assertions.assertEquals( 2, actualMessagesSecondSync .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } .count() - .toInt() + .toInt(), ) - val expectedMessages = getExpectedAirbyteMessagesSecondSync(namespace) + val expectedMessages: MutableList = + getExpectedAirbyteMessagesSecondSync(namespace) setEmittedAtToNull(actualMessagesSecondSync) + expectedMessages.addFirst( + AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage( + AirbyteStreamStatusHolder( + AirbyteStreamNameNamespacePair( + configuredCatalog.streams[0].stream.name, + defaultNamespace + ), + AirbyteStreamStatus.STARTED + ) + ) + ) + + expectedMessages.addLast( + AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage( + AirbyteStreamStatusHolder( + AirbyteStreamNameNamespacePair( + configuredCatalog.streams[0].stream.name, + defaultNamespace + ), + AirbyteStreamStatus.COMPLETE + ) + ) + ) + setTraceEmittedAtToNull(actualMessagesSecondSync) + setTraceEmittedAtToNull(expectedMessages) Assertions.assertEquals(expectedMessages.size, actualMessagesSecondSync.size) Assertions.assertTrue(expectedMessages.containsAll(actualMessagesSecondSync)) Assertions.assertTrue(actualMessagesSecondSync.containsAll(expectedMessages)) @@ -741,17 +917,17 @@ abstract class JdbcSourceAcceptanceTest> { testdb ?.with( "INSERT INTO %s (id, name, updated_at) VALUES (4, 'riker', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME) + getFullyQualifiedTableName(TABLE_NAME), ) ?.with( "INSERT INTO %s (id, name, updated_at) VALUES (5, 'data', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME) + getFullyQualifiedTableName(TABLE_NAME), ) } protected open fun getExpectedAirbyteMessagesSecondSync( namespace: String? - ): List { + ): MutableList { val expectedMessages: MutableList = ArrayList() expectedMessages.add( AirbyteMessage() @@ -768,11 +944,11 @@ abstract class JdbcSourceAcceptanceTest> { COL_NAME, "riker", COL_UPDATED_AT, - "2006-10-19" - ) - ) - ) - ) + "2006-10-19", + ), + ), + ), + ), ) expectedMessages.add( AirbyteMessage() @@ -789,11 +965,11 @@ abstract class JdbcSourceAcceptanceTest> { COL_NAME, "data", COL_UPDATED_AT, - "2006-10-19" - ) - ) - ) - ) + "2006-10-19", + ), + ), + ), + ), ) val state = DbStreamState() @@ -825,21 +1001,24 @@ abstract class JdbcSourceAcceptanceTest> { streamName2, namespace, Field.of(COL_ID, JsonSchemaType.NUMBER), - Field.of(COL_NAME, JsonSchemaType.STRING) - ) + Field.of(COL_NAME, JsonSchemaType.STRING), + ), ) configuredCatalog.streams.forEach( Consumer { airbyteStream: ConfiguredAirbyteStream -> airbyteStream.syncMode = SyncMode.INCREMENTAL airbyteStream.cursorField = java.util.List.of(COL_ID) airbyteStream.destinationSyncMode = DestinationSyncMode.APPEND - } + }, ) val actualMessagesFirstSync = MoreIterators.toList( - source() - .read(config(), configuredCatalog, createEmptyState(streamName(), namespace)) + source()!!.read( + config(), + configuredCatalog, + createEmptyState(streamName(), namespace), + ), ) // get last state message. @@ -865,7 +1044,7 @@ abstract class JdbcSourceAcceptanceTest> { DbStreamState() .withStreamName(streamName2) .withStreamNamespace(namespace) - .withCursorField(java.util.List.of(COL_ID)) + .withCursorField(java.util.List.of(COL_ID)), ) // Represents the state after both streams have been updated @@ -882,16 +1061,16 @@ abstract class JdbcSourceAcceptanceTest> { .withStreamNamespace(namespace) .withCursorField(java.util.List.of(COL_ID)) .withCursor("3") - .withCursorRecordCount(1L) + .withCursorRecordCount(1L), ) val expectedMessagesFirstSync: MutableList = ArrayList(testMessages) expectedMessagesFirstSync.add( - createStateMessage(expectedStateStreams1[0], expectedStateStreams1, 3L) + createStateMessage(expectedStateStreams1[0], expectedStateStreams1, 3L), ) expectedMessagesFirstSync.addAll(secondStreamExpectedMessages) expectedMessagesFirstSync.add( - createStateMessage(expectedStateStreams2[1], expectedStateStreams2, 3L) + createStateMessage(expectedStateStreams2[1], expectedStateStreams2, 3L), ) setEmittedAtToNull(actualMessagesFirstSync) @@ -911,7 +1090,7 @@ abstract class JdbcSourceAcceptanceTest> { (m.record.data as ObjectNode).remove(COL_UPDATED_AT) (m.record.data as ObjectNode).replace( COL_ID, - convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()) + convertIdBasedOnDatabase(m.record.data[COL_ID].asInt()), ) } } @@ -929,7 +1108,7 @@ abstract class JdbcSourceAcceptanceTest> { cursorField, initialCursorValue, endCursorValue, - expectedRecordMessages + expectedRecordMessages, ) } @@ -943,7 +1122,7 @@ abstract class JdbcSourceAcceptanceTest> { String.format( "name VARCHAR(200) NOT NULL, %s %s NOT NULL", COL_TIMESTAMP, - COL_TIMESTAMP_TYPE + COL_TIMESTAMP_TYPE, ) // 1st sync @@ -953,13 +1132,13 @@ abstract class JdbcSourceAcceptanceTest> { INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "a", - "2021-01-01 00:00:00" + "2021-01-01 00:00:00", ) .with( INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "b", - "2021-01-01 00:00:00" + "2021-01-01 00:00:00", ) val configuredCatalog = @@ -973,11 +1152,11 @@ abstract class JdbcSourceAcceptanceTest> { Field.of(COL_NAME, JsonSchemaType.STRING), Field.of( COL_TIMESTAMP, - JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE - ) - ) - ) - ) + JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE, + ), + ), + ), + ), ) configuredCatalog.streams.forEach( @@ -985,17 +1164,16 @@ abstract class JdbcSourceAcceptanceTest> { airbyteStream.syncMode = SyncMode.INCREMENTAL airbyteStream.cursorField = java.util.List.of(COL_TIMESTAMP) airbyteStream.destinationSyncMode = DestinationSyncMode.APPEND - } + }, ) val firstSyncActualMessages = MoreIterators.toList( - source() - .read( - config(), - configuredCatalog, - createEmptyState(TABLE_NAME_AND_TIMESTAMP, namespace) - ) + source()!!.read( + config(), + configuredCatalog, + createEmptyState(TABLE_NAME_AND_TIMESTAMP, namespace), + ), ) // cursor after 1st sync: 2021-01-01 00:00:00, count 2 @@ -1006,7 +1184,7 @@ abstract class JdbcSourceAcceptanceTest> { val firstSyncState = getStateData(firstSyncStateOptional, TABLE_NAME_AND_TIMESTAMP) Assertions.assertEquals( firstSyncState["cursor_field"].elements().next().asText(), - COL_TIMESTAMP + COL_TIMESTAMP, ) Assertions.assertTrue(firstSyncState["cursor"].asText().contains("2021-01-01")) Assertions.assertTrue(firstSyncState["cursor"].asText().contains("00:00:00")) @@ -1024,7 +1202,7 @@ abstract class JdbcSourceAcceptanceTest> { ) { MatcherAssert.assertThat( listOf("a", "b"), - Matchers.containsInAnyOrder(*firstSyncNames.toTypedArray()) + Matchers.containsInAnyOrder(*firstSyncNames.toTypedArray()), ) } else { Assertions.assertEquals(listOf("a", "b"), firstSyncNames) @@ -1035,17 +1213,16 @@ abstract class JdbcSourceAcceptanceTest> { INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "c", - "2021-01-02 00:00:00" + "2021-01-02 00:00:00", ) val secondSyncActualMessages = MoreIterators.toList( - source() - .read( - config(), - configuredCatalog, - createState(TABLE_NAME_AND_TIMESTAMP, namespace, firstSyncState) - ) + source()!!.read( + config(), + configuredCatalog, + createState(TABLE_NAME_AND_TIMESTAMP, namespace, firstSyncState), + ), ) // cursor after 2nd sync: 2021-01-02 00:00:00, count 1 @@ -1056,7 +1233,7 @@ abstract class JdbcSourceAcceptanceTest> { val secondSyncState = getStateData(secondSyncStateOptional, TABLE_NAME_AND_TIMESTAMP) Assertions.assertEquals( secondSyncState["cursor_field"].elements().next().asText(), - COL_TIMESTAMP + COL_TIMESTAMP, ) Assertions.assertTrue(secondSyncState["cursor"].asText().contains("2021-01-02")) Assertions.assertTrue(secondSyncState["cursor"].asText().contains("00:00:00")) @@ -1075,29 +1252,28 @@ abstract class JdbcSourceAcceptanceTest> { INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "d", - "2021-01-02 00:00:00" + "2021-01-02 00:00:00", ) .with( INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "e", - "2021-01-02 00:00:00" + "2021-01-02 00:00:00", ) .with( INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY, fullyQualifiedTableName, "f", - "2021-01-03 00:00:00" + "2021-01-03 00:00:00", ) val thirdSyncActualMessages = MoreIterators.toList( - source() - .read( - config(), - configuredCatalog, - createState(TABLE_NAME_AND_TIMESTAMP, namespace, secondSyncState) - ) + source()!!.read( + config(), + configuredCatalog, + createState(TABLE_NAME_AND_TIMESTAMP, namespace, secondSyncState), + ), ) // Cursor after 3rd sync is: 2021-01-03 00:00:00, count 1. @@ -1108,7 +1284,7 @@ abstract class JdbcSourceAcceptanceTest> { val thirdSyncState = getStateData(thirdSyncStateOptional, TABLE_NAME_AND_TIMESTAMP) Assertions.assertEquals( thirdSyncState["cursor_field"].elements().next().asText(), - COL_TIMESTAMP + COL_TIMESTAMP, ) Assertions.assertTrue(thirdSyncState["cursor"].asText().contains("2021-01-03")) Assertions.assertTrue(thirdSyncState["cursor"].asText().contains("00:00:00")) @@ -1125,7 +1301,7 @@ abstract class JdbcSourceAcceptanceTest> { if (testdb.databaseDriver == DatabaseDriver.TERADATA) { MatcherAssert.assertThat( listOf("c", "d", "e", "f"), - Matchers.containsInAnyOrder(*thirdSyncExpectedNames.toTypedArray()) + Matchers.containsInAnyOrder(*thirdSyncExpectedNames.toTypedArray()), ) } else { Assertions.assertEquals(listOf("c", "d", "e", "f"), thirdSyncExpectedNames) @@ -1155,12 +1331,12 @@ abstract class JdbcSourceAcceptanceTest> { initialCursorValue, endCursorValue, expectedRecordMessages, - getConfiguredCatalogWithOneStream(defaultNamespace).streams[0] + getConfiguredCatalogWithOneStream(defaultNamespace).streams[0], ) } @Throws(Exception::class) - protected fun incrementalCursorCheck( + protected open fun incrementalCursorCheck( initialCursorField: String?, cursorField: String, initialCursorValue: String?, @@ -1179,12 +1355,11 @@ abstract class JdbcSourceAcceptanceTest> { val actualMessages = MoreIterators.toList( - source() - .read( - config(), - configuredCatalog, - Jsons.jsonNode(createState(java.util.List.of(dbStreamState))) - ) + source()!!.read( + config(), + configuredCatalog, + Jsons.jsonNode(createState(java.util.List.of(dbStreamState))), + ), ) setEmittedAtToNull(actualMessages) @@ -1194,9 +1369,34 @@ abstract class JdbcSourceAcceptanceTest> { val expectedMessages: MutableList = ArrayList(expectedRecordMessages) expectedMessages.addAll( - createExpectedTestMessages(expectedStreams, expectedRecordMessages.size.toLong()) + createExpectedTestMessages(expectedStreams, expectedRecordMessages.size.toLong()), + ) + + expectedMessages.addFirst( + AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage( + AirbyteStreamStatusHolder( + AirbyteStreamNameNamespacePair( + airbyteStream.stream.name, + airbyteStream.stream.namespace + ), + AirbyteStreamStatus.STARTED + ) + ) ) + expectedMessages.addLast( + AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage( + AirbyteStreamStatusHolder( + AirbyteStreamNameNamespacePair( + airbyteStream.stream.name, + airbyteStream.stream.namespace + ), + AirbyteStreamStatus.COMPLETE + ) + ) + ) + setTraceEmittedAtToNull(actualMessages) + setTraceEmittedAtToNull(expectedMessages) Assertions.assertEquals(expectedMessages.size, actualMessages.size) Assertions.assertTrue(expectedMessages.containsAll(actualMessages)) Assertions.assertTrue(actualMessages.containsAll(expectedMessages)) @@ -1238,10 +1438,10 @@ abstract class JdbcSourceAcceptanceTest> { defaultNamespace, Field.of(COL_ID, JsonSchemaType.INTEGER), Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING), ) .withSupportedSyncModes( - java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL) + java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), ) .withSourceDefinedPrimaryKey(java.util.List.of(java.util.List.of(COL_ID))), CatalogHelpers.createAirbyteStream( @@ -1249,10 +1449,10 @@ abstract class JdbcSourceAcceptanceTest> { defaultNamespace, Field.of(COL_ID, JsonSchemaType.INTEGER), Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING), ) .withSupportedSyncModes( - java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL) + java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), ) .withSourceDefinedPrimaryKey(emptyList()), CatalogHelpers.createAirbyteStream( @@ -1260,18 +1460,18 @@ abstract class JdbcSourceAcceptanceTest> { defaultNamespace, Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), Field.of(COL_LAST_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING), ) .withSupportedSyncModes( - java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL) + java.util.List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), ) .withSourceDefinedPrimaryKey( java.util.List.of( java.util.List.of(COL_FIRST_NAME), - java.util.List.of(COL_LAST_NAME) - ) - ) - ) + java.util.List.of(COL_LAST_NAME), + ), + ), + ), ) } @@ -1292,10 +1492,10 @@ abstract class JdbcSourceAcceptanceTest> { COL_NAME, "picard", COL_UPDATED_AT, - "2004-10-19" - ) - ) - ) + "2004-10-19", + ), + ), + ), ), AirbyteMessage() .withType(AirbyteMessage.Type.RECORD) @@ -1311,10 +1511,10 @@ abstract class JdbcSourceAcceptanceTest> { COL_NAME, "crusher", COL_UPDATED_AT, - "2005-10-19" - ) - ) - ) + "2005-10-19", + ), + ), + ), ), AirbyteMessage() .withType(AirbyteMessage.Type.RECORD) @@ -1330,11 +1530,11 @@ abstract class JdbcSourceAcceptanceTest> { COL_NAME, "vash", COL_UPDATED_AT, - "2006-10-19" - ) - ) - ) - ) + "2006-10-19", + ), + ), + ), + ), ) protected open fun createExpectedTestMessages( @@ -1357,7 +1557,9 @@ abstract class JdbcSourceAcceptanceTest> { .withStreamState(Jsons.jsonNode(s)) ) .withData(Jsons.jsonNode(DbState().withCdc(false).withStreams(states))) - .withSourceStats(AirbyteStateStats().withRecordCount(numRecords.toDouble())) + .withSourceStats( + AirbyteStateStats().withRecordCount(numRecords.toDouble()) + ), ) } } @@ -1373,7 +1575,7 @@ abstract class JdbcSourceAcceptanceTest> { .withNamespace(s.streamNamespace) .withName(s.streamName) ) - .withStreamState(Jsons.jsonNode(s)) + .withStreamState(Jsons.jsonNode(s)), ) } } @@ -1392,17 +1594,17 @@ abstract class JdbcSourceAcceptanceTest> { getFullyQualifiedTableName( RelationalDbQueryUtils.enquoteIdentifier( tableNameWithSpaces, - identifierQuoteString - ) + identifierQuoteString, + ), ), "id INTEGER, " + RelationalDbQueryUtils.enquoteIdentifier( COL_LAST_NAME_WITH_SPACE, - identifierQuoteString + identifierQuoteString, ) + " VARCHAR(200)", - "" - ) + "", + ), ) connection .createStatement() @@ -1412,14 +1614,14 @@ abstract class JdbcSourceAcceptanceTest> { getFullyQualifiedTableName( RelationalDbQueryUtils.enquoteIdentifier( tableNameWithSpaces, - identifierQuoteString - ) + identifierQuoteString, + ), ), RelationalDbQueryUtils.enquoteIdentifier( COL_LAST_NAME_WITH_SPACE, - identifierQuoteString - ) - ) + identifierQuoteString, + ), + ), ) connection .createStatement() @@ -1429,14 +1631,14 @@ abstract class JdbcSourceAcceptanceTest> { getFullyQualifiedTableName( RelationalDbQueryUtils.enquoteIdentifier( tableNameWithSpaces, - identifierQuoteString - ) + identifierQuoteString, + ), ), RelationalDbQueryUtils.enquoteIdentifier( COL_LAST_NAME_WITH_SPACE, - identifierQuoteString - ) - ) + identifierQuoteString, + ), + ), ) connection .createStatement() @@ -1446,21 +1648,21 @@ abstract class JdbcSourceAcceptanceTest> { getFullyQualifiedTableName( RelationalDbQueryUtils.enquoteIdentifier( tableNameWithSpaces, - identifierQuoteString - ) + identifierQuoteString, + ), ), RelationalDbQueryUtils.enquoteIdentifier( COL_LAST_NAME_WITH_SPACE, - identifierQuoteString - ) - ) + identifierQuoteString, + ), + ), ) } return CatalogHelpers.createConfiguredAirbyteStream( streamName2, defaultNamespace, Field.of(COL_ID, JsonSchemaType.NUMBER), - Field.of(COL_LAST_NAME_WITH_SPACE, JsonSchemaType.STRING) + Field.of(COL_LAST_NAME_WITH_SPACE, JsonSchemaType.STRING), ) } @@ -1510,8 +1712,8 @@ abstract class JdbcSourceAcceptanceTest> { .withStream( AirbyteStreamState() .withStreamDescriptor( - StreamDescriptor().withName(streamName).withNamespace(streamNamespace) - ) + StreamDescriptor().withName(streamName).withNamespace(streamNamespace), + ), ) return Jsons.jsonNode(java.util.List.of(airbyteStateMessage)) } @@ -1527,9 +1729,9 @@ abstract class JdbcSourceAcceptanceTest> { .withStream( AirbyteStreamState() .withStreamDescriptor( - StreamDescriptor().withName(streamName).withNamespace(streamNamespace) + StreamDescriptor().withName(streamName).withNamespace(streamNamespace), ) - .withStreamState(stateData) + .withStreamState(stateData), ) return Jsons.jsonNode(java.util.List.of(airbyteStateMessage)) } @@ -1554,11 +1756,11 @@ abstract class JdbcSourceAcceptanceTest> { .withStreamDescriptor( StreamDescriptor() .withNamespace(streamNamespace) - .withName(streamName) + .withName(streamName), ) - .withStreamState(jsonStreamState) + .withStreamState(jsonStreamState), ) - .withSourceStats(AirbyteStateStats().withRecordCount(recordCount.toDouble())) + .withSourceStats(AirbyteStateStats().withRecordCount(recordCount.toDouble())), ) } @@ -1577,12 +1779,12 @@ abstract class JdbcSourceAcceptanceTest> { .withStreamDescriptor( StreamDescriptor() .withNamespace(dbStreamState.streamNamespace) - .withName(dbStreamState.streamName) + .withName(dbStreamState.streamName), ) - .withStreamState(Jsons.jsonNode(dbStreamState)) + .withStreamState(Jsons.jsonNode(dbStreamState)), ) .withData(Jsons.jsonNode(DbState().withCdc(false).withStreams(legacyStates))) - .withSourceStats(AirbyteStateStats().withRecordCount(recordCount.toDouble())) + .withSourceStats(AirbyteStateStats().withRecordCount(recordCount.toDouble())), ) } @@ -1632,7 +1834,7 @@ abstract class JdbcSourceAcceptanceTest> { AirbyteRecordMessage() .withData(Jsons.jsonNode(data)) .withStream(stream) - .withNamespace(namespace) + .withNamespace(namespace), ) } @@ -1699,5 +1901,14 @@ abstract class JdbcSourceAcceptanceTest> { } } } + + @JvmStatic + protected fun setTraceEmittedAtToNull(traceMessages: Iterable) { + for (traceMessage in traceMessages) { + if (traceMessage.trace != null) { + traceMessage.trace.emittedAt = null + } + } + } } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/CompositeIterator.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/CompositeIterator.kt index a871c860cf7c..5ef2b5590fa2 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/CompositeIterator.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/CompositeIterator.kt @@ -33,7 +33,8 @@ private val LOGGER = KotlinLogging.logger {} class CompositeIterator internal constructor( iterators: List>, - airbyteStreamStatusConsumer: Consumer? + airbyteStreamStatusConsumer: Consumer?, + emitStatus: Boolean = false ) : AbstractIterator(), AutoCloseableIterator { private val airbyteStreamStatusConsumer: Optional> private val iterators: List> @@ -41,6 +42,7 @@ internal constructor( private var i: Int private val seenIterators: MutableSet> private var hasClosed: Boolean + private var emitStatus: Boolean = emitStatus init { Preconditions.checkNotNull(iterators) @@ -65,16 +67,20 @@ internal constructor( while (!currentIterator().hasNext()) { try { currentIterator().close() - emitStartStreamStatus(currentIterator().airbyteStream) - StreamStatusUtils.emitCompleteStreamStatus( - airbyteStream, - airbyteStreamStatusConsumer - ) + if (emitStatus) { + emitStartStreamStatus(currentIterator().airbyteStream) + StreamStatusUtils.emitCompleteStreamStatus( + airbyteStream, + airbyteStreamStatusConsumer + ) + } } catch (e: Exception) { - StreamStatusUtils.emitIncompleteStreamStatus( - airbyteStream, - airbyteStreamStatusConsumer - ) + if (emitStatus) { + StreamStatusUtils.emitIncompleteStreamStatus( + airbyteStream, + airbyteStreamStatusConsumer + ) + } throw RuntimeException(e) } @@ -86,17 +92,27 @@ internal constructor( } try { - val isFirstRun = emitStartStreamStatus(currentIterator().airbyteStream) + var isFirstRun = false + if (emitStatus) { + isFirstRun = emitStartStreamStatus(currentIterator().airbyteStream) + } val next = currentIterator().next() - if (isFirstRun) { - StreamStatusUtils.emitRunningStreamStatus( + if (emitStatus) { + if (isFirstRun) { + StreamStatusUtils.emitRunningStreamStatus( + airbyteStream, + airbyteStreamStatusConsumer + ) + } + } + return next + } catch (e: RuntimeException) { + if (emitStatus) { + StreamStatusUtils.emitIncompleteStreamStatus( airbyteStream, airbyteStreamStatusConsumer ) } - return next - } catch (e: RuntimeException) { - StreamStatusUtils.emitIncompleteStreamStatus(airbyteStream, airbyteStreamStatusConsumer) throw e } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/util/CompositeIteratorTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/util/CompositeIteratorTest.kt index b2f6b1579924..09f17fe88f7c 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/util/CompositeIteratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/util/CompositeIteratorTest.kt @@ -73,7 +73,8 @@ internal class CompositeIteratorTest { airbyteStream3 ) ), - airbyteStreamStatusConsumer + airbyteStreamStatusConsumer, + true ) assertOnCloseInvocations(ImmutableList.of(), ImmutableList.of(onClose1, onClose2, onClose3)) @@ -122,7 +123,8 @@ internal class CompositeIteratorTest { airbyteStream3 ) ), - airbyteStreamStatusConsumer + airbyteStreamStatusConsumer, + true ) assertOnCloseInvocations(ImmutableList.of(), ImmutableList.of(onClose1, onClose2, onClose3)) @@ -150,7 +152,8 @@ internal class CompositeIteratorTest { airbyteStream1 ) ), - airbyteStreamStatusConsumer + airbyteStreamStatusConsumer, + true ) assertOnCloseInvocations(ImmutableList.of(), ImmutableList.of(onClose1)) @@ -174,7 +177,8 @@ internal class CompositeIteratorTest { airbyteStream1 ) ), - airbyteStreamStatusConsumer + airbyteStreamStatusConsumer, + true ) assertOnCloseInvocations(ImmutableList.of(), ImmutableList.of(onClose1)) diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 197f62294880..170bae02911f 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.35.4' + cdkVersionRequired = '0.35.11' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 393123ed1445..48b031581d81 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.4.4 + dockerImageTag: 3.4.5 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 013f0e17fdd6..d4fb037c8e02 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -49,11 +49,14 @@ import io.airbyte.cdk.integrations.source.relationaldb.state.StateGeneratorUtils; import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; import io.airbyte.cdk.integrations.source.relationaldb.state.StateManagerFactory; +import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.map.MoreMaps; +import io.airbyte.commons.stream.AirbyteStreamStatusHolder; import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.integrations.source.mysql.cdc.CdcConfigurationHelper; import io.airbyte.integrations.source.mysql.cursor_based.MySqlCursorBasedStateManager; import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadGlobalStateManager; @@ -65,17 +68,9 @@ import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.InitialLoadStreams; import io.airbyte.integrations.source.mysql.internal.models.CursorBasedStatus; import io.airbyte.protocol.models.CommonField; -import io.airbyte.protocol.models.v0.AirbyteCatalog; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; +import io.airbyte.protocol.models.v0.*; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.ConnectorSpecification; -import io.airbyte.protocol.models.v0.SyncMode; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.sql.SQLException; @@ -93,6 +88,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.sql.DataSource; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,11 +124,8 @@ public class MySqlSource extends AbstractJdbcSource implements Source public static final String CDC_LOG_FILE = "_ab_cdc_log_file"; public static final String CDC_LOG_POS = "_ab_cdc_log_pos"; public static final String CDC_DEFAULT_CURSOR = "_ab_cdc_cursor"; - public static final List SSL_PARAMETERS = List.of( - "useSSL=true", - "requireSSL=true"); - public static Source sshWrappedSource(MySqlSource source) { + public static Source sshWrappedSource(final MySqlSource source) { return new SshWrappedSource(source, JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); } @@ -146,7 +139,7 @@ private ConnectorSpecification getCloudDeploymentSpec(final ConnectorSpecificati } @Override - public ConnectorSpecification spec() throws Exception { + public @NotNull ConnectorSpecification spec() throws Exception { if (cloudDeploymentMode()) { return getCloudDeploymentSpec(super.spec()); } @@ -154,7 +147,7 @@ public ConnectorSpecification spec() throws Exception { } @Override - public AirbyteConnectionStatus check(final JsonNode config) throws Exception { + public AirbyteConnectionStatus check(final @NotNull JsonNode config) throws Exception { // #15808 Disallow connecting to db with disable, prefer or allow SSL mode when connecting directly // and not over SSH tunnel if (cloudDeploymentMode()) { @@ -195,13 +188,13 @@ public boolean supportResumableFullRefresh(final JdbcDatabase database, final Co @Override protected void initializeForStateManager(final JdbcDatabase database, - final ConfiguredAirbyteCatalog catalog, - final Map>> tableNameToTable, - final StateManager stateManager) { + final @NotNull ConfiguredAirbyteCatalog catalog, + final @NotNull Map>> tableNameToTable, + final @NotNull StateManager stateManager) { if (initialLoadStateManager != null) { return; } - var sourceConfig = database.getSourceConfig(); + final var sourceConfig = database.getSourceConfig(); if (isCdc(sourceConfig)) { isSavedOffsetStillPresentOnServer = isSavedOffsetStillPresentOnServer(database, catalog, stateManager); @@ -480,10 +473,11 @@ public List> getIncrementalIterators(final new MySqlInitialLoadHandler(sourceConfig, database, new MySqlSourceOperations(), getQuoteString(), initialLoadStateManager, Optional.of(namespacePair -> Jsons.jsonNode(pairToCursorBasedStatus.get(convertNameNamespacePairFromV0(namespacePair)))), getTableSizeInfoForStreams(database, catalog.getStreams(), getQuoteString())); + // Cursor based incremental iterators are decorated with start and complete status traces final List> initialLoadIterator = new ArrayList<>(initialLoadHandler.getIncrementalIterators( new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()), tableNameToTable, - emittedAt)); + emittedAt, true, true)); // Build Cursor based iterator final List> cursorBasedIterator = @@ -651,4 +645,17 @@ public enum ReplicationMethod { CDC } + @NotNull + @Override + public AutoCloseableIterator augmentWithStreamStatus(@NotNull final ConfiguredAirbyteStream airbyteStream, + @NotNull final AutoCloseableIterator streamItrator) { + final var pair = + new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace()); + final var starterStatus = + new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED)); + final var completeStatus = + new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)); + return AutoCloseableIterators.concatWithEagerClose(starterStatus, streamItrator, completeStatus); + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java index 67f928c05f5a..5da57ec89403 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadHandler.java @@ -18,6 +18,8 @@ import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator; import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency; +import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator; +import io.airbyte.commons.stream.AirbyteStreamStatusHolder; import io.airbyte.commons.stream.AirbyteStreamUtils; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; @@ -26,15 +28,8 @@ import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus; import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.CommonField; -import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.*; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.CatalogHelpers; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.SyncMode; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -83,7 +78,9 @@ public MySqlInitialLoadHandler(final JsonNode config, public List> getIncrementalIterators( final ConfiguredAirbyteCatalog catalog, final Map>> tableNameToTable, - final Instant emittedAt) { + final Instant emittedAt, + final boolean decorateWithStartedStatus, + final boolean decorateWithCompletedStatus) { final List> iteratorList = new ArrayList<>(); for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { final AirbyteStream stream = airbyteStream.getStream(); @@ -93,7 +90,16 @@ public List> getIncrementalIterators( if (airbyteStream.getSyncMode().equals(SyncMode.INCREMENTAL)) { final String fullyQualifiedTableName = DbSourceDiscoverUtil.getFullyQualifiedTableName(namespace, streamName); final TableInfo> table = tableNameToTable.get(fullyQualifiedTableName); + if (decorateWithStartedStatus) { + iteratorList.add( + new StreamStatusTraceEmitterIterator(new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED))); + } + iteratorList.add(getIteratorForStream(airbyteStream, table, emittedAt)); + if (decorateWithCompletedStatus) { + iteratorList.add(new StreamStatusTraceEmitterIterator( + new AirbyteStreamStatusHolder(pair, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE))); + } } } return iteratorList; @@ -120,8 +126,8 @@ public AutoCloseableIterator getIteratorForStream( final AutoCloseableIterator recordIterator = getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli()); final AutoCloseableIterator recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream, pair); - return augmentWithLogs(recordAndMessageIterator, pair, streamName); + return augmentWithLogs(recordAndMessageIterator, pair, streamName); } private static boolean isCompositePrimaryKey(final ConfiguredAirbyteStream stream) { diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index 6dd3020f3c5d..cdd2fd4dbb22 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -26,8 +26,10 @@ import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState; import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; +import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.stream.AirbyteStreamStatusHolder; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.integrations.source.mysql.MySqlQueryUtils; @@ -43,14 +45,7 @@ import io.airbyte.integrations.source.mysql.internal.models.CursorBasedStatus; import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus; import io.airbyte.protocol.models.CommonField; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStateMessage; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import io.airbyte.protocol.models.v0.AirbyteStreamState; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.StreamDescriptor; -import io.airbyte.protocol.models.v0.SyncMode; +import io.airbyte.protocol.models.v0.*; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -88,9 +83,9 @@ public static Optional getMySqlFullRefreshInitialLoadHa if (!initialLoadStreams.streamsForInitialLoad().isEmpty()) { // Filter on initialLoadStream - var pair = new AirbyteStreamNameNamespacePair(fullRefreshStream.getStream().getName(), fullRefreshStream.getStream().getNamespace()); - var pkStatus = initialLoadStreams.pairToInitialLoadStatus.get(pair); - Map fullRefreshPkStatus; + final var pair = new AirbyteStreamNameNamespacePair(fullRefreshStream.getStream().getName(), fullRefreshStream.getStream().getNamespace()); + final var pkStatus = initialLoadStreams.pairToInitialLoadStatus.get(pair); + final Map fullRefreshPkStatus; if (pkStatus == null) { fullRefreshPkStatus = Map.of(); } else { @@ -223,12 +218,23 @@ public static List> getCdcReadIterators(fi getMySqlInitialLoadHandler(database, emittedAt, quoteString, initialLoadStreams, initialLoadGlobalStateManager, Optional.of(new CdcMetadataInjector(emittedAt.toString(), stateAttributes, metadataInjector))); + // Because initial load streams will be followed by cdc read of those stream, we only decorate with + // complete status trace + // after CDC read is done. initialLoadIterator.addAll(initialLoadHandler.getIncrementalIterators( new ConfiguredAirbyteCatalog().withStreams(initialLoadStreams.streamsForInitialLoad()), tableNameToTable, - emittedAt)); + emittedAt, true, false)); } + final List> cdcStreamsStartStatusEmitters = catalog.getStreams().stream() + .filter(stream -> !initialLoadStreams.streamsForInitialLoad.contains(stream)) + .map(stream -> (AutoCloseableIterator) new StreamStatusTraceEmitterIterator( + new AirbyteStreamStatusHolder( + new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()), + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED))) + .toList(); + // Build the incremental CDC iterators. final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler( sourceConfig, @@ -244,6 +250,14 @@ public static List> getCdcReadIterators(fi final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators( propertiesManager, eventConverter, new MySqlCdcSavedInfoFetcher(stateToBeUsed), new MySqlCdcStateHandler(stateManager)); + final List> allStreamsCompleteStatusEmitters = catalog.getStreams().stream() + .filter(stream -> stream.getSyncMode() == SyncMode.INCREMENTAL) + .map(stream -> (AutoCloseableIterator) new StreamStatusTraceEmitterIterator( + new AirbyteStreamStatusHolder( + new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace()), + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE))) + .toList(); + // This starts processing the binglogs as soon as initial sync is complete, this is a bit different // from the current cdc syncs. // We finish the current CDC once the initial snapshot is complete and the next sync starts @@ -251,7 +265,10 @@ public static List> getCdcReadIterators(fi return Collections.singletonList( AutoCloseableIterators.concatWithEagerClose( Stream - .of(initialLoadIterator, Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null))) + .of(initialLoadIterator, + cdcStreamsStartStatusEmitters, + Collections.singletonList(AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)), + allStreamsCompleteStatusEmitters) .flatMap(Collection::stream) .collect(Collectors.toList()), AirbyteTraceMessageUtility::emitStreamStatusTrace)); @@ -448,7 +465,7 @@ private static Optional getPrimaryKeyInfo(final JdbcDatabase dat return Optional.empty(); } - final String pkFieldName = stream.getStream().getSourceDefinedPrimaryKey().get(0).get(0); + final String pkFieldName = stream.getStream().getSourceDefinedPrimaryKey().getFirst().getFirst(); final MysqlType pkFieldType = table.getFields().stream() .filter(field -> field.getName().equals(pkFieldName)) .findFirst().get().getType(); diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index 47896534bd9a..fd397230ad89 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -96,7 +96,8 @@ public class CdcMysqlSourceTest extends CdcSourceTest stateMessages, long totalCount) { AtomicLong count = new AtomicLong(0L); - stateMessages.stream().forEach(stateMessage -> count.addAndGet(stateMessage.getSourceStats().getRecordCount().longValue())); + stateMessages.stream().forEach( + stateMessage -> count.addAndGet(stateMessage.getSourceStats() != null ? stateMessage.getSourceStats().getRecordCount().longValue() : 0L)); assertEquals(totalCount, count.get()); } diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index d276a05677e1..9e877480bce7 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -229,7 +229,8 @@ Any database or table encoding combination of charset and collation is supported ## Changelog | Version | Date | Pull Request | Subject | -| :------ | :--------- | :--------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------- | +|:--------|:-----------| :--------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.4.5 | 2024-05-23 | [38198](https://github.com/airbytehq/airbyte/pull/38198) | Sync sending trace status messages indicating progress. | | 3.4.4 | 2024-05-15 | [38208](https://github.com/airbytehq/airbyte/pull/38208) | disable counts in full refresh stream in state message. | | 3.4.3 | 2024-05-13 | [38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages. | | 3.4.2 | 2024-05-07 | [38046](https://github.com/airbytehq/airbyte/pull/38046) | Resumeable refresh should run only if there is source defined pk. |