diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 00120ffc62b4..ab899a340e00 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -61,6 +61,7 @@ import java.time.OffsetDateTime import java.time.OffsetTime import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runBlocking +import org.apache.commons.lang3.RandomStringUtils import org.junit.jupiter.api.Assertions.assertDoesNotThrow import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue @@ -140,14 +141,17 @@ abstract class BasicFunctionalityIntegrationTest( configUpdater = configUpdater, envVars = envVars, ) { - val parsedConfig = - ValidatedJsonUtils.parseOne(configSpecClass, configUpdater.update(configContents)) + + // Update config with any replacements. This may be necessary when using testcontainers. + val configAsString = configUpdater.update(configContents) + val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configAsString) @Test open fun testBasicWrite() { + val streamName = generateStreamName() val stream = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Append, ObjectType(linkedMapOf("id" to intType)), generationId = 0, @@ -156,12 +160,12 @@ abstract class BasicFunctionalityIntegrationTest( ) val messages = runSync( - configContents, + configAsString, stream, listOf( InputRecord( namespace = randomizedNamespace, - name = "test_stream", + name = streamName, data = """{"id": 5678, "undeclared": "asdf"}""", emittedAtMs = 1234, changes = @@ -176,7 +180,7 @@ abstract class BasicFunctionalityIntegrationTest( ) ), InputStreamCheckpoint( - streamName = "test_stream", + streamName = streamName, streamNamespace = randomizedNamespace, blob = """{"foo": "bar"}""", sourceRecordCount = 1, @@ -194,7 +198,7 @@ abstract class BasicFunctionalityIntegrationTest( ) assertEquals( StreamCheckpoint( - streamName = "test_stream", + streamName = streamName, streamNamespace = randomizedNamespace, blob = """{"foo": "bar"}""", sourceRecordCount = 1, @@ -207,7 +211,7 @@ abstract class BasicFunctionalityIntegrationTest( { if (verifyDataWriting) { dumpAndDiffRecords( - ValidatedJsonUtils.parseOne(configSpecClass, configContents), + ValidatedJsonUtils.parseOne(configSpecClass, configAsString), listOf( OutputRecord( extractedAt = 1234, @@ -268,7 +272,7 @@ abstract class BasicFunctionalityIntegrationTest( val messages = runSync( - configContents, + configAsString, stream, listOf( InputFile( @@ -306,7 +310,7 @@ abstract class BasicFunctionalityIntegrationTest( ) }) - val config = ValidatedJsonUtils.parseOne(configSpecClass, configContents) + val config = ValidatedJsonUtils.parseOne(configSpecClass, configAsString) val fileContent = dataDumper.dumpFile(config, stream) assertEquals(listOf("123"), fileContent) @@ -316,9 +320,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testMidSyncCheckpointingStreamState(): Unit = runBlocking(Dispatchers.IO) { + val streamName = generateStreamName() val stream = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Append, ObjectType(linkedMapOf("id" to intType)), generationId = 0, @@ -327,27 +332,27 @@ abstract class BasicFunctionalityIntegrationTest( ) val stateMessage = runSyncUntilStateAck( - configContents, + this@BasicFunctionalityIntegrationTest.configContents, stream, listOf( InputRecord( namespace = randomizedNamespace, - name = "test_stream", + name = streamName, data = """{"id": 12}""", emittedAtMs = 1234, ) ), StreamCheckpoint( streamNamespace = randomizedNamespace, - streamName = "test_stream", + streamName = streamName, blob = """{"foo": "bar1"}""", sourceRecordCount = 1 ), allowGracefulShutdown = false, ) - runSync(configContents, stream, emptyList()) + runSync(this@BasicFunctionalityIntegrationTest.configContents, stream, emptyList()) - val streamName = stateMessage.stream.streamDescriptor.name + val actualStreamName = stateMessage.stream.streamDescriptor.name val streamNamespace = stateMessage.stream.streamDescriptor.namespace // basic state message checks - this is mostly just exercising the CDK itself, // but is cheap and easy to do. @@ -356,8 +361,8 @@ abstract class BasicFunctionalityIntegrationTest( { assertEquals( streamName, - "test_stream", - "Expected stream name to be test_stream, got $streamName" + actualStreamName, + "Expected stream name to be $streamName, got $actualStreamName" ) }, { @@ -396,9 +401,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testNamespaces() { assumeTrue(verifyDataWriting) + val streamName = generateStreamName() fun makeStream(namespace: String) = DestinationStream( - DestinationStream.Descriptor(namespace, "test_stream"), + DestinationStream.Descriptor(namespace, streamName), Append, ObjectType(linkedMapOf("id" to intType)), generationId = 0, @@ -408,7 +414,7 @@ abstract class BasicFunctionalityIntegrationTest( val stream1 = makeStream(randomizedNamespace + "_1") val stream2 = makeStream(randomizedNamespace + "_2") runSync( - configContents, + configAsString, DestinationCatalog( listOf( stream1, @@ -531,7 +537,7 @@ abstract class BasicFunctionalityIntegrationTest( serialized = "", ) } - runSync(configContents, catalog, messages) + runSync(configAsString, catalog, messages) assertAll( catalog.streams.map { stream -> { @@ -561,9 +567,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testTruncateRefresh() { assumeTrue(verifyDataWriting) + val streamName = generateStreamName() fun makeStream(generationId: Long, minimumGenerationId: Long, syncId: Long) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Append, ObjectType(linkedMapOf("id" to intType, "name" to stringType)), generationId, @@ -571,12 +578,12 @@ abstract class BasicFunctionalityIntegrationTest( syncId, ) runSync( - configContents, + configAsString, makeStream(generationId = 12, minimumGenerationId = 0, syncId = 42), listOf( InputRecord( randomizedNamespace, - "test_stream", + streamName, """{"id": 42, "name": "first_value"}""", emittedAtMs = 1234L, ) @@ -584,12 +591,12 @@ abstract class BasicFunctionalityIntegrationTest( ) val finalStream = makeStream(generationId = 13, minimumGenerationId = 13, syncId = 43) runSync( - configContents, + configAsString, finalStream, listOf( InputRecord( randomizedNamespace, - "test_stream", + streamName, """{"id": 42, "name": "second_value"}""", emittedAtMs = 1234, ) @@ -624,10 +631,11 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testInterruptedTruncateWithPriorData() { assumeTrue(verifyDataWriting) + val streamName = generateStreamName() fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) = InputRecord( randomizedNamespace, - "test_stream", + streamName, """{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""", emittedAtMs = extractedAt, ) @@ -652,7 +660,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a normal sync with nonempty data val stream1 = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Append, ObjectType( linkedMapOf( @@ -666,7 +674,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 41, ) runSync( - configContents, + configAsString, stream1, listOf( makeInputRecord(1, "2024-01-23T01:00:00Z", 100), @@ -705,7 +713,7 @@ abstract class BasicFunctionalityIntegrationTest( ) // Run a sync, but emit a status incomplete. This should not delete any existing data. runSyncUntilStateAck( - configContents, + configAsString, stream2, listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), StreamCheckpoint( @@ -754,7 +762,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a third sync, this time with a successful status. // This should delete the first sync's data, and retain the second+third syncs' data. runSync( - configContents, + configAsString, stream2, listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) @@ -793,10 +801,11 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testInterruptedTruncateWithoutPriorData() { assumeTrue(verifyDataWriting) + val streamName = generateStreamName() fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) = InputRecord( randomizedNamespace, - "test_stream", + streamName, """{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""", emittedAtMs = extractedAt, ) @@ -820,7 +829,7 @@ abstract class BasicFunctionalityIntegrationTest( ) val stream = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Append, ObjectType( linkedMapOf( @@ -835,7 +844,7 @@ abstract class BasicFunctionalityIntegrationTest( ) // Run a sync, but emit a stream status incomplete. runSyncUntilStateAck( - configContents, + configAsString, stream, listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), StreamCheckpoint( @@ -870,7 +879,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a second sync, this time with a successful status. // This should retain the first syncs' data. runSync( - configContents, + configAsString, stream, listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) @@ -914,10 +923,11 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun resumeAfterCancelledTruncate() { assumeTrue(verifyDataWriting) + val streamName = generateStreamName() fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) = InputRecord( randomizedNamespace, - "test_stream", + streamName, """{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""", emittedAtMs = extractedAt, ) @@ -942,7 +952,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a normal sync with nonempty data val stream1 = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Append, ObjectType( linkedMapOf( @@ -956,7 +966,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 41, ) runSync( - configContents, + configAsString, stream1, listOf( makeInputRecord(1, "2024-01-23T01:00:00Z", 100), @@ -996,7 +1006,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a sync, but emit a stream status incomplete. This should not delete any existing // data. runSyncUntilStateAck( - configContents, + configAsString, stream2, listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), StreamCheckpoint( @@ -1051,7 +1061,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 43, ) runSync( - configContents, + configAsString, stream3, listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) @@ -1101,9 +1111,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testAppend() { assumeTrue(verifyDataWriting) + val streamName = generateStreamName() fun makeStream(syncId: Long) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Append, ObjectType(linkedMapOf("id" to intType, "name" to stringType)), generationId = 0, @@ -1111,12 +1122,12 @@ abstract class BasicFunctionalityIntegrationTest( syncId, ) runSync( - configContents, + configAsString, makeStream(syncId = 42), listOf( InputRecord( randomizedNamespace, - "test_stream", + streamName, """{"id": 42, "name": "first_value"}""", emittedAtMs = 1234L, ) @@ -1124,12 +1135,12 @@ abstract class BasicFunctionalityIntegrationTest( ) val finalStream = makeStream(syncId = 43) runSync( - configContents, + configAsString, finalStream, listOf( InputRecord( randomizedNamespace, - "test_stream", + streamName, """{"id": 42, "name": "second_value"}""", emittedAtMs = 5678L, ) @@ -1167,9 +1178,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testAppendSchemaEvolution() { assumeTrue(verifyDataWriting) + val streamName = generateStreamName() fun makeStream(syncId: Long, schema: LinkedHashMap) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Append, ObjectType(schema), generationId = 0, @@ -1177,7 +1189,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId, ) runSync( - configContents, + configAsString, makeStream( syncId = 42, linkedMapOf("id" to intType, "to_drop" to stringType, "to_change" to intType) @@ -1185,7 +1197,7 @@ abstract class BasicFunctionalityIntegrationTest( listOf( InputRecord( randomizedNamespace, - "test_stream", + streamName, """{"id": 42, "to_drop": "val1", "to_change": 42}""", emittedAtMs = 1234L, ) @@ -1197,12 +1209,12 @@ abstract class BasicFunctionalityIntegrationTest( linkedMapOf("id" to intType, "to_change" to stringType, "to_add" to stringType) ) runSync( - configContents, + configAsString, finalStream, listOf( InputRecord( randomizedNamespace, - "test_stream", + streamName, """{"id": 42, "to_change": "val2", "to_add": "val3"}""", emittedAtMs = 1234, ) @@ -1238,9 +1250,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testDedup() { assumeTrue(supportsDedup) + val streamName = generateStreamName() fun makeStream(syncId: Long) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), importType = Dedupe( primaryKey = listOf(listOf("id1"), listOf("id2")), @@ -1264,14 +1277,14 @@ abstract class BasicFunctionalityIntegrationTest( fun makeRecord(data: String, extractedAt: Long) = InputRecord( randomizedNamespace, - "test_stream", + streamName, data, emittedAtMs = extractedAt, ) val sync1Stream = makeStream(syncId = 42) runSync( - configContents, + configAsString, sync1Stream, listOf( // emitted_at:1000 is equal to 1970-01-01 00:00:01Z. @@ -1332,7 +1345,7 @@ abstract class BasicFunctionalityIntegrationTest( val sync2Stream = makeStream(syncId = 43) runSync( - configContents, + configAsString, sync2Stream, listOf( // Update both Alice and Bob @@ -1387,9 +1400,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testDedupChangeCursor() { assumeTrue(verifyDataWriting && supportsDedup) + val streamName = generateStreamName() fun makeStream(cursor: String) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Dedupe( primaryKey = listOf(listOf("id")), cursor = listOf(cursor), @@ -1409,16 +1423,16 @@ abstract class BasicFunctionalityIntegrationTest( fun makeRecord(cursorName: String) = InputRecord( randomizedNamespace, - "test_stream", + streamName, data = """{"id": 1, "$cursorName": 1, "name": "foo_$cursorName"}""", // this is unrealistic (extractedAt should always increase between syncs), // but it lets us force the dedupe behavior to rely solely on the cursor column, // instead of being able to fallback onto extractedAt. emittedAtMs = 100, ) - runSync(configContents, makeStream("cursor1"), listOf(makeRecord("cursor1"))) + runSync(configAsString, makeStream("cursor1"), listOf(makeRecord("cursor1"))) val stream2 = makeStream("cursor2") - runSync(configContents, stream2, listOf(makeRecord("cursor2"))) + runSync(configAsString, stream2, listOf(makeRecord("cursor2"))) dumpAndDiffRecords( parsedConfig, listOf( @@ -1475,7 +1489,7 @@ abstract class BasicFunctionalityIntegrationTest( ) } // Just verify that we don't crash. - assertDoesNotThrow { runSync(configContents, DestinationCatalog(streams), messages) } + assertDoesNotThrow { runSync(configAsString, DestinationCatalog(streams), messages) } } /** @@ -1489,9 +1503,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testBasicTypes() { assumeTrue(verifyDataWriting) + val streamName = generateStreamName() val stream = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Append, ObjectType( linkedMapOf( @@ -1523,12 +1538,12 @@ abstract class BasicFunctionalityIntegrationTest( fun makeRecord(data: String) = InputRecord( randomizedNamespace, - "test_stream", + streamName, data, emittedAtMs = 100, ) runSync( - configContents, + configAsString, stream, listOf( // A record with valid values for all fields @@ -1812,7 +1827,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -1975,7 +1990,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -2146,7 +2161,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -2289,9 +2304,10 @@ abstract class BasicFunctionalityIntegrationTest( * happens sometimes. */ open fun testNoColumns() { + val streamName = generateStreamName() val stream = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + DestinationStream.Descriptor(randomizedNamespace, streamName), Append, ObjectType(linkedMapOf()), generationId = 42, @@ -2299,12 +2315,12 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( randomizedNamespace, - "test_stream", + streamName, """{"foo": "bar"}""", emittedAtMs = 1000L, ) @@ -2331,6 +2347,11 @@ abstract class BasicFunctionalityIntegrationTest( ) } + @Suppress("DEPRECATION") + private fun generateStreamName(): String { + return "test_stream_${RandomStringUtils.randomAlphabetic(4)}" + } + companion object { private val intType = FieldType(IntegerType, nullable = true) private val numberType = FieldType(NumberType, nullable = true) diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt index 862c4f6664c5..14b59856d7b8 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt @@ -25,16 +25,23 @@ data class MSSQLConfiguration( class MSSQLConfigurationFactory : DestinationConfigurationFactory { override fun makeWithoutExceptionHandling(pojo: MSSQLSpecification): MSSQLConfiguration { + return makeWithOverrides(spec = pojo) + } + + fun makeWithOverrides( + spec: MSSQLSpecification, + overrides: Map = emptyMap() + ): MSSQLConfiguration { return MSSQLConfiguration( - host = pojo.host, - port = pojo.port, - database = pojo.database, - schema = pojo.schema, - user = pojo.user, - password = pojo.password, - jdbcUrlParams = pojo.jdbcUrlParams, - rawDataSchema = pojo.rawDataSchema, - sslMethod = pojo.sslMethod, + host = overrides.getOrDefault("host", spec.host), + port = overrides.getOrDefault("port", spec.port.toString()).toInt(), + database = overrides.getOrDefault("database", spec.database), + schema = overrides.getOrDefault("schema", spec.schema), + user = overrides.getOrDefault("user", spec.user), + password = overrides.getOrDefault("password", spec.password), + jdbcUrlParams = overrides.getOrDefault("jdbcUrlParams", spec.jdbcUrlParams), + rawDataSchema = overrides.getOrDefault("rawDataSchema", spec.rawDataSchema), + sslMethod = spec.sslMethod, ) } } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt index c820bb0f9bff..dcda5d2aa7e2 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt @@ -25,7 +25,7 @@ internal class MSSQLCheckTest : MSSQLTestConfigUtil.getConfigPath("check/fail-internal-schema-invalid.json") ) to "\"iamnotthere\" either does not exist".toPattern(), ), - configUpdater = MSSQLContainerHelper + configUpdater = MSSQLConfigUpdater() ) { @BeforeAll diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt index e4d8daa76b4d..a8cf2f3320ee 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt @@ -5,8 +5,12 @@ package io.airbyte.integrations.destination.mssql.v2 import io.airbyte.cdk.load.test.util.ConfigurationUpdater +import io.airbyte.integrations.destination.mssql.v2.MSSQLContainerHelper.getIpAddress +import io.airbyte.integrations.destination.mssql.v2.MSSQLContainerHelper.getPort +import io.airbyte.integrations.destination.mssql.v2.MSSQLContainerHelper.testContainer import io.github.oshai.kotlinlogging.KotlinLogging import org.testcontainers.containers.MSSQLServerContainer +import org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT val logger = KotlinLogging.logger {} @@ -14,7 +18,7 @@ val logger = KotlinLogging.logger {} * Helper class for launching/stopping MSSQL Server test containers, as well as updating destination * configuration to match test container configuration. */ -object MSSQLContainerHelper : ConfigurationUpdater { +object MSSQLContainerHelper { private val testContainer = MSSQLServerContainer("mcr.microsoft.com/mssql/server:2022-latest") @@ -32,15 +36,32 @@ object MSSQLContainerHelper : ConfigurationUpdater { testContainer.close() } } - override fun update(config: String): String { - return getHost()?.let { host -> - config.replace("localhost", host).replace("replace_me", testContainer.password) - } - ?: config - } - private fun getHost(): String? { + + fun getHost(): String = testContainer.host + + fun getPassword(): String = testContainer.password + + fun getPort(): Int? = testContainer.firstMappedPort + + fun getIpAddress(): String? { // Ensure that the container is started first start() return testContainer.containerInfo.networkSettings.networks.entries.first().value.ipAddress } } + +class MSSQLConfigUpdater(private val replacePort: Boolean = false) : ConfigurationUpdater { + override fun update(config: String): String { + var updatedConfig = config + updatedConfig = + MSSQLContainerHelper.getIpAddress()?.let { config.replace("localhost", it) } + ?: updatedConfig + if (replacePort) { + updatedConfig = + getPort()?.let { config.replace("$MS_SQL_SERVER_PORT", it.toString()) } + ?: updatedConfig + } + + return updatedConfig.replace("replace_me", MSSQLContainerHelper.getPassword()) + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt index a34bf92d2053..9afe343aff3b 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt @@ -13,6 +13,7 @@ import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.StronglyTyped import io.airbyte.integrations.destination.mssql.v2.config.DataSourceFactory +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfigurationFactory import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification import io.airbyte.protocol.models.Jsons @@ -20,6 +21,9 @@ import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import java.nio.file.Files import java.time.Instant import java.util.UUID +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.TestInstance abstract class MSSQLWriterTest( configPath: String, @@ -41,6 +45,7 @@ abstract class MSSQLWriterTest( nullEqualsUnset = true, supportFileTransfer = false, envVars = emptyMap(), + configUpdater = MSSQLConfigUpdater() ) class MSSQLDataDumper : DestinationDataDumper { @@ -48,8 +53,7 @@ class MSSQLDataDumper : DestinationDataDumper { spec: ConfigurationSpecification, stream: DestinationStream ): List { - val config = - MSSQLConfigurationFactory().makeWithoutExceptionHandling(spec as MSSQLSpecification) + val config = getConfiguration(spec = spec as MSSQLSpecification, stream = stream) val sqlBuilder = MSSQLQueryBuilder(config, stream) val dataSource = DataSourceFactory().dataSource(config) val output = mutableListOf() @@ -106,6 +110,23 @@ class MSSQLDataDumper : DestinationDataDumper { ): List { return emptyList() } + + private fun getConfiguration( + spec: ConfigurationSpecification, + stream: DestinationStream + ): MSSQLConfiguration { + /* + * Replace the host, port and schema to match what is exposed + * by the container and generated by the test suite in the case of the schema name + */ + val configOverrides = + mutableMapOf("host" to MSSQLContainerHelper.getHost()).apply { + MSSQLContainerHelper.getPort()?.let { port -> put("port", port.toString()) } + stream.descriptor.namespace?.let { schema -> put("schema", schema) } + } + return MSSQLConfigurationFactory() + .makeWithOverrides(spec = spec as MSSQLSpecification, overrides = configOverrides) + } } class MSSQLDataCleaner : DestinationCleaner { @@ -114,9 +135,21 @@ class MSSQLDataCleaner : DestinationCleaner { } } +@TestInstance(TestInstance.Lifecycle.PER_CLASS) class StandardInsert : MSSQLWriterTest( "check/valid.json", MSSQLDataDumper(), MSSQLDataCleaner(), - ) + ) { + + @BeforeAll + fun beforeAll() { + MSSQLContainerHelper.start() + } + + @AfterAll + fun afterAll() { + MSSQLContainerHelper.stop() + } +}