diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 452a4f05ae8b..88aa8e9afdea 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -193,8 +193,10 @@ abstract class BasicFunctionalityIntegrationTest( configUpdater = configUpdater, envVars = envVars, ) { - val parsedConfig = - ValidatedJsonUtils.parseOne(configSpecClass, configUpdater.update(configContents)) + + // Update config with any replacements. This may be necessary when using testcontainers. + val configAsString = configUpdater.update(configContents) + val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configAsString) @Test open fun testBasicWrite() { @@ -209,7 +211,7 @@ abstract class BasicFunctionalityIntegrationTest( ) val messages = runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -260,7 +262,7 @@ abstract class BasicFunctionalityIntegrationTest( { if (verifyDataWriting) { dumpAndDiffRecords( - ValidatedJsonUtils.parseOne(configSpecClass, configContents), + ValidatedJsonUtils.parseOne(configSpecClass, configAsString), listOf( OutputRecord( extractedAt = 1234, @@ -321,7 +323,7 @@ abstract class BasicFunctionalityIntegrationTest( val messages = runSync( - configContents, + configAsString, stream, listOf( InputFile( @@ -359,7 +361,7 @@ abstract class BasicFunctionalityIntegrationTest( ) }) - val config = ValidatedJsonUtils.parseOne(configSpecClass, configContents) + val config = ValidatedJsonUtils.parseOne(configSpecClass, configAsString) val fileContent = dataDumper.dumpFile(config, stream) assertEquals(listOf("123"), fileContent) @@ -380,7 +382,7 @@ abstract class BasicFunctionalityIntegrationTest( ) val stateMessage = runSyncUntilStateAck( - configContents, + this@BasicFunctionalityIntegrationTest.configContents, stream, listOf( InputRecord( @@ -398,7 +400,7 @@ abstract class BasicFunctionalityIntegrationTest( ), allowGracefulShutdown = false, ) - runSync(configContents, stream, emptyList()) + runSync(this@BasicFunctionalityIntegrationTest.configContents, stream, emptyList()) val streamName = stateMessage.stream.streamDescriptor.name val streamNamespace = stateMessage.stream.streamDescriptor.namespace @@ -461,7 +463,7 @@ abstract class BasicFunctionalityIntegrationTest( val stream1 = makeStream(randomizedNamespace + "_1") val stream2 = makeStream(randomizedNamespace + "_2") runSync( - configContents, + configAsString, DestinationCatalog( listOf( stream1, @@ -584,7 +586,7 @@ abstract class BasicFunctionalityIntegrationTest( serialized = "", ) } - runSync(configContents, catalog, messages) + runSync(configAsString, catalog, messages) assertAll( catalog.streams.map { stream -> { @@ -624,7 +626,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId, ) runSync( - configContents, + configAsString, makeStream(generationId = 12, minimumGenerationId = 0, syncId = 42), listOf( InputRecord( @@ -637,7 +639,7 @@ abstract class BasicFunctionalityIntegrationTest( ) val finalStream = makeStream(generationId = 13, minimumGenerationId = 13, syncId = 43) runSync( - configContents, + configAsString, finalStream, listOf( InputRecord( @@ -719,7 +721,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 41, ) runSync( - configContents, + configAsString, stream1, listOf( makeInputRecord(1, "2024-01-23T01:00:00Z", 100), @@ -758,7 +760,7 @@ abstract class BasicFunctionalityIntegrationTest( ) // Run a sync, but emit a status incomplete. This should not delete any existing data. runSyncUntilStateAck( - configContents, + configAsString, stream2, listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), StreamCheckpoint( @@ -807,7 +809,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a third sync, this time with a successful status. // This should delete the first sync's data, and retain the second+third syncs' data. runSync( - configContents, + configAsString, stream2, listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) @@ -888,7 +890,7 @@ abstract class BasicFunctionalityIntegrationTest( ) // Run a sync, but emit a stream status incomplete. runSyncUntilStateAck( - configContents, + configAsString, stream, listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), StreamCheckpoint( @@ -923,7 +925,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a second sync, this time with a successful status. // This should retain the first syncs' data. runSync( - configContents, + configAsString, stream, listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) @@ -1009,7 +1011,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 41, ) runSync( - configContents, + configAsString, stream1, listOf( makeInputRecord(1, "2024-01-23T01:00:00Z", 100), @@ -1049,7 +1051,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a sync, but emit a stream status incomplete. This should not delete any existing // data. runSyncUntilStateAck( - configContents, + configAsString, stream2, listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), StreamCheckpoint( @@ -1104,7 +1106,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 43, ) runSync( - configContents, + configAsString, stream3, listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) @@ -1164,7 +1166,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId, ) runSync( - configContents, + configAsString, makeStream(syncId = 42), listOf( InputRecord( @@ -1177,7 +1179,7 @@ abstract class BasicFunctionalityIntegrationTest( ) val finalStream = makeStream(syncId = 43) runSync( - configContents, + configAsString, finalStream, listOf( InputRecord( @@ -1230,7 +1232,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId, ) runSync( - configContents, + configAsString, makeStream( syncId = 42, linkedMapOf("id" to intType, "to_drop" to stringType, "to_change" to intType) @@ -1250,7 +1252,7 @@ abstract class BasicFunctionalityIntegrationTest( linkedMapOf("id" to intType, "to_change" to stringType, "to_add" to stringType) ) runSync( - configContents, + configAsString, finalStream, listOf( InputRecord( @@ -1324,7 +1326,7 @@ abstract class BasicFunctionalityIntegrationTest( val sync1Stream = makeStream(syncId = 42) runSync( - configContents, + configAsString, sync1Stream, listOf( // emitted_at:1000 is equal to 1970-01-01 00:00:01Z. @@ -1385,7 +1387,7 @@ abstract class BasicFunctionalityIntegrationTest( val sync2Stream = makeStream(syncId = 43) runSync( - configContents, + configAsString, sync2Stream, listOf( // Update both Alice and Bob @@ -1469,9 +1471,9 @@ abstract class BasicFunctionalityIntegrationTest( // instead of being able to fallback onto extractedAt. emittedAtMs = 100, ) - runSync(configContents, makeStream("cursor1"), listOf(makeRecord("cursor1"))) + runSync(configAsString, makeStream("cursor1"), listOf(makeRecord("cursor1"))) val stream2 = makeStream("cursor2") - runSync(configContents, stream2, listOf(makeRecord("cursor2"))) + runSync(configAsString, stream2, listOf(makeRecord("cursor2"))) dumpAndDiffRecords( parsedConfig, listOf( @@ -1528,7 +1530,7 @@ abstract class BasicFunctionalityIntegrationTest( ) } // Just verify that we don't crash. - assertDoesNotThrow { runSync(configContents, DestinationCatalog(streams), messages) } + assertDoesNotThrow { runSync(configAsString, DestinationCatalog(streams), messages) } } /** @@ -1581,7 +1583,7 @@ abstract class BasicFunctionalityIntegrationTest( emittedAtMs = 100, ) runSync( - configContents, + configAsString, stream, listOf( // A record with valid values for all fields @@ -1865,7 +1867,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -2044,7 +2046,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -2215,7 +2217,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -2407,7 +2409,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt index 862c4f6664c5..14b59856d7b8 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt @@ -25,16 +25,23 @@ data class MSSQLConfiguration( class MSSQLConfigurationFactory : DestinationConfigurationFactory { override fun makeWithoutExceptionHandling(pojo: MSSQLSpecification): MSSQLConfiguration { + return makeWithOverrides(spec = pojo) + } + + fun makeWithOverrides( + spec: MSSQLSpecification, + overrides: Map = emptyMap() + ): MSSQLConfiguration { return MSSQLConfiguration( - host = pojo.host, - port = pojo.port, - database = pojo.database, - schema = pojo.schema, - user = pojo.user, - password = pojo.password, - jdbcUrlParams = pojo.jdbcUrlParams, - rawDataSchema = pojo.rawDataSchema, - sslMethod = pojo.sslMethod, + host = overrides.getOrDefault("host", spec.host), + port = overrides.getOrDefault("port", spec.port.toString()).toInt(), + database = overrides.getOrDefault("database", spec.database), + schema = overrides.getOrDefault("schema", spec.schema), + user = overrides.getOrDefault("user", spec.user), + password = overrides.getOrDefault("password", spec.password), + jdbcUrlParams = overrides.getOrDefault("jdbcUrlParams", spec.jdbcUrlParams), + rawDataSchema = overrides.getOrDefault("rawDataSchema", spec.rawDataSchema), + sslMethod = spec.sslMethod, ) } } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt index c820bb0f9bff..2d4419ec8ead 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt @@ -9,9 +9,7 @@ import io.airbyte.cdk.load.check.CheckTestConfig import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.TestInstance -@TestInstance(TestInstance.Lifecycle.PER_CLASS) internal class MSSQLCheckTest : CheckIntegrationTest( successConfigFilenames = @@ -25,16 +23,20 @@ internal class MSSQLCheckTest : MSSQLTestConfigUtil.getConfigPath("check/fail-internal-schema-invalid.json") ) to "\"iamnotthere\" either does not exist".toPattern(), ), - configUpdater = MSSQLContainerHelper + configUpdater = MSSQLConfigUpdater() ) { - @BeforeAll - fun beforeAll() { - MSSQLContainerHelper.start() - } + companion object { + @JvmStatic + @BeforeAll + fun beforeAll() { + MSSQLContainerHelper.start() + } - @AfterAll - fun afterAll() { - MSSQLContainerHelper.stop() + @JvmStatic + @AfterAll + fun afterAll() { + MSSQLContainerHelper.stop() + } } } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt index e4d8daa76b4d..a8cf2f3320ee 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt @@ -5,8 +5,12 @@ package io.airbyte.integrations.destination.mssql.v2 import io.airbyte.cdk.load.test.util.ConfigurationUpdater +import io.airbyte.integrations.destination.mssql.v2.MSSQLContainerHelper.getIpAddress +import io.airbyte.integrations.destination.mssql.v2.MSSQLContainerHelper.getPort +import io.airbyte.integrations.destination.mssql.v2.MSSQLContainerHelper.testContainer import io.github.oshai.kotlinlogging.KotlinLogging import org.testcontainers.containers.MSSQLServerContainer +import org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT val logger = KotlinLogging.logger {} @@ -14,7 +18,7 @@ val logger = KotlinLogging.logger {} * Helper class for launching/stopping MSSQL Server test containers, as well as updating destination * configuration to match test container configuration. */ -object MSSQLContainerHelper : ConfigurationUpdater { +object MSSQLContainerHelper { private val testContainer = MSSQLServerContainer("mcr.microsoft.com/mssql/server:2022-latest") @@ -32,15 +36,32 @@ object MSSQLContainerHelper : ConfigurationUpdater { testContainer.close() } } - override fun update(config: String): String { - return getHost()?.let { host -> - config.replace("localhost", host).replace("replace_me", testContainer.password) - } - ?: config - } - private fun getHost(): String? { + + fun getHost(): String = testContainer.host + + fun getPassword(): String = testContainer.password + + fun getPort(): Int? = testContainer.firstMappedPort + + fun getIpAddress(): String? { // Ensure that the container is started first start() return testContainer.containerInfo.networkSettings.networks.entries.first().value.ipAddress } } + +class MSSQLConfigUpdater(private val replacePort: Boolean = false) : ConfigurationUpdater { + override fun update(config: String): String { + var updatedConfig = config + updatedConfig = + MSSQLContainerHelper.getIpAddress()?.let { config.replace("localhost", it) } + ?: updatedConfig + if (replacePort) { + updatedConfig = + getPort()?.let { config.replace("$MS_SQL_SERVER_PORT", it.toString()) } + ?: updatedConfig + } + + return updatedConfig.replace("replace_me", MSSQLContainerHelper.getPassword()) + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt index a34bf92d2053..e6cf196bec70 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt @@ -13,6 +13,7 @@ import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.StronglyTyped import io.airbyte.integrations.destination.mssql.v2.config.DataSourceFactory +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfigurationFactory import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification import io.airbyte.protocol.models.Jsons @@ -20,6 +21,8 @@ import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import java.nio.file.Files import java.time.Instant import java.util.UUID +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll abstract class MSSQLWriterTest( configPath: String, @@ -41,6 +44,7 @@ abstract class MSSQLWriterTest( nullEqualsUnset = true, supportFileTransfer = false, envVars = emptyMap(), + configUpdater = MSSQLConfigUpdater() ) class MSSQLDataDumper : DestinationDataDumper { @@ -48,8 +52,7 @@ class MSSQLDataDumper : DestinationDataDumper { spec: ConfigurationSpecification, stream: DestinationStream ): List { - val config = - MSSQLConfigurationFactory().makeWithoutExceptionHandling(spec as MSSQLSpecification) + val config = getConfiguration(spec = spec as MSSQLSpecification, stream = stream) val sqlBuilder = MSSQLQueryBuilder(config, stream) val dataSource = DataSourceFactory().dataSource(config) val output = mutableListOf() @@ -106,6 +109,23 @@ class MSSQLDataDumper : DestinationDataDumper { ): List { return emptyList() } + + private fun getConfiguration( + spec: ConfigurationSpecification, + stream: DestinationStream + ): MSSQLConfiguration { + /* + * Replace the host, port and schema to match what is exposed + * by the container and generated by the test suite in the case of the schema name + */ + val configOverrides = + mutableMapOf("host" to MSSQLContainerHelper.getHost()).apply { + MSSQLContainerHelper.getPort()?.let { port -> put("port", port.toString()) } + stream.descriptor.namespace?.let { schema -> put("schema", schema) } + } + return MSSQLConfigurationFactory() + .makeWithOverrides(spec = spec as MSSQLSpecification, overrides = configOverrides) + } } class MSSQLDataCleaner : DestinationCleaner { @@ -114,9 +134,24 @@ class MSSQLDataCleaner : DestinationCleaner { } } -class StandardInsert : +internal class StandardInsert : MSSQLWriterTest( "check/valid.json", MSSQLDataDumper(), MSSQLDataCleaner(), - ) + ) { + + companion object { + @JvmStatic + @BeforeAll + fun beforeAll() { + MSSQLContainerHelper.start() + } + + @JvmStatic + @AfterAll + fun afterAll() { + MSSQLContainerHelper.stop() + } + } +}