From 46c0e54633a614edc6deb655a7473a1c59c17dd1 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Fri, 17 Jan 2025 09:18:36 -0500 Subject: [PATCH] PR feedback --- .../BasicFunctionalityIntegrationTest.kt | 87 ++++++++----------- .../destination/mssql/v2/MSSQLCheckTest.kt | 20 +++-- .../destination/mssql/v2/MSSQLWriterTest.kt | 22 ++--- 3 files changed, 57 insertions(+), 72 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index ab899a340e00..92552d991e2d 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -61,7 +61,6 @@ import java.time.OffsetDateTime import java.time.OffsetTime import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runBlocking -import org.apache.commons.lang3.RandomStringUtils import org.junit.jupiter.api.Assertions.assertDoesNotThrow import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue @@ -148,10 +147,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testBasicWrite() { - val streamName = generateStreamName() val stream = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Append, ObjectType(linkedMapOf("id" to intType)), generationId = 0, @@ -165,7 +163,7 @@ abstract class BasicFunctionalityIntegrationTest( listOf( InputRecord( namespace = randomizedNamespace, - name = streamName, + name = "test_stream", data = """{"id": 5678, "undeclared": "asdf"}""", emittedAtMs = 1234, changes = @@ -180,7 +178,7 @@ abstract class BasicFunctionalityIntegrationTest( ) ), InputStreamCheckpoint( - streamName = streamName, + streamName = "test_stream", streamNamespace = randomizedNamespace, blob = """{"foo": "bar"}""", sourceRecordCount = 1, @@ -198,7 +196,7 @@ abstract class BasicFunctionalityIntegrationTest( ) assertEquals( StreamCheckpoint( - streamName = streamName, + streamName = "test_stream", streamNamespace = randomizedNamespace, blob = """{"foo": "bar"}""", sourceRecordCount = 1, @@ -320,10 +318,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testMidSyncCheckpointingStreamState(): Unit = runBlocking(Dispatchers.IO) { - val streamName = generateStreamName() val stream = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Append, ObjectType(linkedMapOf("id" to intType)), generationId = 0, @@ -337,14 +334,14 @@ abstract class BasicFunctionalityIntegrationTest( listOf( InputRecord( namespace = randomizedNamespace, - name = streamName, + name = "test_stream", data = """{"id": 12}""", emittedAtMs = 1234, ) ), StreamCheckpoint( streamNamespace = randomizedNamespace, - streamName = streamName, + streamName = "test_stream", blob = """{"foo": "bar1"}""", sourceRecordCount = 1 ), @@ -352,7 +349,7 @@ abstract class BasicFunctionalityIntegrationTest( ) runSync(this@BasicFunctionalityIntegrationTest.configContents, stream, emptyList()) - val actualStreamName = stateMessage.stream.streamDescriptor.name + val streamName = stateMessage.stream.streamDescriptor.name val streamNamespace = stateMessage.stream.streamDescriptor.namespace // basic state message checks - this is mostly just exercising the CDK itself, // but is cheap and easy to do. @@ -361,8 +358,8 @@ abstract class BasicFunctionalityIntegrationTest( { assertEquals( streamName, - actualStreamName, - "Expected stream name to be $streamName, got $actualStreamName" + "test_stream", + "Expected stream name to be test_stream, got $streamName" ) }, { @@ -401,10 +398,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testNamespaces() { assumeTrue(verifyDataWriting) - val streamName = generateStreamName() fun makeStream(namespace: String) = DestinationStream( - DestinationStream.Descriptor(namespace, streamName), + DestinationStream.Descriptor(namespace, "test_stream"), Append, ObjectType(linkedMapOf("id" to intType)), generationId = 0, @@ -567,10 +563,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testTruncateRefresh() { assumeTrue(verifyDataWriting) - val streamName = generateStreamName() fun makeStream(generationId: Long, minimumGenerationId: Long, syncId: Long) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Append, ObjectType(linkedMapOf("id" to intType, "name" to stringType)), generationId, @@ -583,7 +578,7 @@ abstract class BasicFunctionalityIntegrationTest( listOf( InputRecord( randomizedNamespace, - streamName, + "test_stream", """{"id": 42, "name": "first_value"}""", emittedAtMs = 1234L, ) @@ -596,7 +591,7 @@ abstract class BasicFunctionalityIntegrationTest( listOf( InputRecord( randomizedNamespace, - streamName, + "test_stream", """{"id": 42, "name": "second_value"}""", emittedAtMs = 1234, ) @@ -631,11 +626,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testInterruptedTruncateWithPriorData() { assumeTrue(verifyDataWriting) - val streamName = generateStreamName() fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) = InputRecord( randomizedNamespace, - streamName, + "test_stream", """{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""", emittedAtMs = extractedAt, ) @@ -660,7 +654,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a normal sync with nonempty data val stream1 = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Append, ObjectType( linkedMapOf( @@ -801,11 +795,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testInterruptedTruncateWithoutPriorData() { assumeTrue(verifyDataWriting) - val streamName = generateStreamName() fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) = InputRecord( randomizedNamespace, - streamName, + "test_stream", """{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""", emittedAtMs = extractedAt, ) @@ -829,7 +822,7 @@ abstract class BasicFunctionalityIntegrationTest( ) val stream = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Append, ObjectType( linkedMapOf( @@ -923,11 +916,10 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun resumeAfterCancelledTruncate() { assumeTrue(verifyDataWriting) - val streamName = generateStreamName() fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) = InputRecord( randomizedNamespace, - streamName, + "test_stream", """{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""", emittedAtMs = extractedAt, ) @@ -952,7 +944,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a normal sync with nonempty data val stream1 = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Append, ObjectType( linkedMapOf( @@ -1111,10 +1103,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testAppend() { assumeTrue(verifyDataWriting) - val streamName = generateStreamName() fun makeStream(syncId: Long) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Append, ObjectType(linkedMapOf("id" to intType, "name" to stringType)), generationId = 0, @@ -1127,7 +1118,7 @@ abstract class BasicFunctionalityIntegrationTest( listOf( InputRecord( randomizedNamespace, - streamName, + "test_stream", """{"id": 42, "name": "first_value"}""", emittedAtMs = 1234L, ) @@ -1140,7 +1131,7 @@ abstract class BasicFunctionalityIntegrationTest( listOf( InputRecord( randomizedNamespace, - streamName, + "test_stream", """{"id": 42, "name": "second_value"}""", emittedAtMs = 5678L, ) @@ -1178,10 +1169,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testAppendSchemaEvolution() { assumeTrue(verifyDataWriting) - val streamName = generateStreamName() fun makeStream(syncId: Long, schema: LinkedHashMap) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Append, ObjectType(schema), generationId = 0, @@ -1197,7 +1187,7 @@ abstract class BasicFunctionalityIntegrationTest( listOf( InputRecord( randomizedNamespace, - streamName, + "test_stream", """{"id": 42, "to_drop": "val1", "to_change": 42}""", emittedAtMs = 1234L, ) @@ -1214,7 +1204,7 @@ abstract class BasicFunctionalityIntegrationTest( listOf( InputRecord( randomizedNamespace, - streamName, + "test_stream", """{"id": 42, "to_change": "val2", "to_add": "val3"}""", emittedAtMs = 1234, ) @@ -1250,10 +1240,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testDedup() { assumeTrue(supportsDedup) - val streamName = generateStreamName() fun makeStream(syncId: Long) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), importType = Dedupe( primaryKey = listOf(listOf("id1"), listOf("id2")), @@ -1277,7 +1266,7 @@ abstract class BasicFunctionalityIntegrationTest( fun makeRecord(data: String, extractedAt: Long) = InputRecord( randomizedNamespace, - streamName, + "test_stream", data, emittedAtMs = extractedAt, ) @@ -1400,10 +1389,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testDedupChangeCursor() { assumeTrue(verifyDataWriting && supportsDedup) - val streamName = generateStreamName() fun makeStream(cursor: String) = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Dedupe( primaryKey = listOf(listOf("id")), cursor = listOf(cursor), @@ -1423,7 +1411,7 @@ abstract class BasicFunctionalityIntegrationTest( fun makeRecord(cursorName: String) = InputRecord( randomizedNamespace, - streamName, + "test_stream", data = """{"id": 1, "$cursorName": 1, "name": "foo_$cursorName"}""", // this is unrealistic (extractedAt should always increase between syncs), // but it lets us force the dedupe behavior to rely solely on the cursor column, @@ -1503,10 +1491,9 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testBasicTypes() { assumeTrue(verifyDataWriting) - val streamName = generateStreamName() val stream = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Append, ObjectType( linkedMapOf( @@ -1538,7 +1525,7 @@ abstract class BasicFunctionalityIntegrationTest( fun makeRecord(data: String) = InputRecord( randomizedNamespace, - streamName, + "test_stream", data, emittedAtMs = 100, ) @@ -2304,10 +2291,9 @@ abstract class BasicFunctionalityIntegrationTest( * happens sometimes. */ open fun testNoColumns() { - val streamName = generateStreamName() val stream = DestinationStream( - DestinationStream.Descriptor(randomizedNamespace, streamName), + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), Append, ObjectType(linkedMapOf()), generationId = 42, @@ -2320,7 +2306,7 @@ abstract class BasicFunctionalityIntegrationTest( listOf( InputRecord( randomizedNamespace, - streamName, + "test_stream", """{"foo": "bar"}""", emittedAtMs = 1000L, ) @@ -2347,11 +2333,6 @@ abstract class BasicFunctionalityIntegrationTest( ) } - @Suppress("DEPRECATION") - private fun generateStreamName(): String { - return "test_stream_${RandomStringUtils.randomAlphabetic(4)}" - } - companion object { private val intType = FieldType(IntegerType, nullable = true) private val numberType = FieldType(NumberType, nullable = true) diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt index dcda5d2aa7e2..2d4419ec8ead 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt @@ -9,9 +9,7 @@ import io.airbyte.cdk.load.check.CheckTestConfig import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.TestInstance -@TestInstance(TestInstance.Lifecycle.PER_CLASS) internal class MSSQLCheckTest : CheckIntegrationTest( successConfigFilenames = @@ -28,13 +26,17 @@ internal class MSSQLCheckTest : configUpdater = MSSQLConfigUpdater() ) { - @BeforeAll - fun beforeAll() { - MSSQLContainerHelper.start() - } + companion object { + @JvmStatic + @BeforeAll + fun beforeAll() { + MSSQLContainerHelper.start() + } - @AfterAll - fun afterAll() { - MSSQLContainerHelper.stop() + @JvmStatic + @AfterAll + fun afterAll() { + MSSQLContainerHelper.stop() + } } } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt index 9afe343aff3b..e6cf196bec70 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt @@ -23,7 +23,6 @@ import java.time.Instant import java.util.UUID import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.BeforeAll -import org.junit.jupiter.api.TestInstance abstract class MSSQLWriterTest( configPath: String, @@ -135,21 +134,24 @@ class MSSQLDataCleaner : DestinationCleaner { } } -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -class StandardInsert : +internal class StandardInsert : MSSQLWriterTest( "check/valid.json", MSSQLDataDumper(), MSSQLDataCleaner(), ) { - @BeforeAll - fun beforeAll() { - MSSQLContainerHelper.start() - } + companion object { + @JvmStatic + @BeforeAll + fun beforeAll() { + MSSQLContainerHelper.start() + } - @AfterAll - fun afterAll() { - MSSQLContainerHelper.stop() + @JvmStatic + @AfterAll + fun afterAll() { + MSSQLContainerHelper.stop() + } } }