Skip to content

Commit

Permalink
fix: use testcontainer for write tests (#51594)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jan 17, 2025
1 parent 5339a2e commit 28005f5
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ abstract class BasicFunctionalityIntegrationTest(
configUpdater = configUpdater,
envVars = envVars,
) {
val parsedConfig =
ValidatedJsonUtils.parseOne(configSpecClass, configUpdater.update(configContents))

// Update config with any replacements. This may be necessary when using testcontainers.
val configAsString = configUpdater.update(configContents)
val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configAsString)

@Test
open fun testBasicWrite() {
Expand All @@ -209,7 +211,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val messages =
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -260,7 +262,7 @@ abstract class BasicFunctionalityIntegrationTest(
{
if (verifyDataWriting) {
dumpAndDiffRecords(
ValidatedJsonUtils.parseOne(configSpecClass, configContents),
ValidatedJsonUtils.parseOne(configSpecClass, configAsString),
listOf(
OutputRecord(
extractedAt = 1234,
Expand Down Expand Up @@ -321,7 +323,7 @@ abstract class BasicFunctionalityIntegrationTest(

val messages =
runSync(
configContents,
configAsString,
stream,
listOf(
InputFile(
Expand Down Expand Up @@ -359,7 +361,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
})

val config = ValidatedJsonUtils.parseOne(configSpecClass, configContents)
val config = ValidatedJsonUtils.parseOne(configSpecClass, configAsString)
val fileContent = dataDumper.dumpFile(config, stream)

assertEquals(listOf("123"), fileContent)
Expand All @@ -380,7 +382,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val stateMessage =
runSyncUntilStateAck(
configContents,
this@BasicFunctionalityIntegrationTest.configContents,
stream,
listOf(
InputRecord(
Expand All @@ -398,7 +400,7 @@ abstract class BasicFunctionalityIntegrationTest(
),
allowGracefulShutdown = false,
)
runSync(configContents, stream, emptyList())
runSync(this@BasicFunctionalityIntegrationTest.configContents, stream, emptyList())

val streamName = stateMessage.stream.streamDescriptor.name
val streamNamespace = stateMessage.stream.streamDescriptor.namespace
Expand Down Expand Up @@ -461,7 +463,7 @@ abstract class BasicFunctionalityIntegrationTest(
val stream1 = makeStream(randomizedNamespace + "_1")
val stream2 = makeStream(randomizedNamespace + "_2")
runSync(
configContents,
configAsString,
DestinationCatalog(
listOf(
stream1,
Expand Down Expand Up @@ -584,7 +586,7 @@ abstract class BasicFunctionalityIntegrationTest(
serialized = "",
)
}
runSync(configContents, catalog, messages)
runSync(configAsString, catalog, messages)
assertAll(
catalog.streams.map { stream ->
{
Expand Down Expand Up @@ -624,7 +626,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
configAsString,
makeStream(generationId = 12, minimumGenerationId = 0, syncId = 42),
listOf(
InputRecord(
Expand All @@ -637,7 +639,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val finalStream = makeStream(generationId = 13, minimumGenerationId = 13, syncId = 43)
runSync(
configContents,
configAsString,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -719,7 +721,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 41,
)
runSync(
configContents,
configAsString,
stream1,
listOf(
makeInputRecord(1, "2024-01-23T01:00:00Z", 100),
Expand Down Expand Up @@ -758,7 +760,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
// Run a sync, but emit a status incomplete. This should not delete any existing data.
runSyncUntilStateAck(
configContents,
configAsString,
stream2,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -807,7 +809,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a third sync, this time with a successful status.
// This should delete the first sync's data, and retain the second+third syncs' data.
runSync(
configContents,
configAsString,
stream2,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -888,7 +890,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
// Run a sync, but emit a stream status incomplete.
runSyncUntilStateAck(
configContents,
configAsString,
stream,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -923,7 +925,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a second sync, this time with a successful status.
// This should retain the first syncs' data.
runSync(
configContents,
configAsString,
stream,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -1009,7 +1011,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 41,
)
runSync(
configContents,
configAsString,
stream1,
listOf(
makeInputRecord(1, "2024-01-23T01:00:00Z", 100),
Expand Down Expand Up @@ -1049,7 +1051,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a sync, but emit a stream status incomplete. This should not delete any existing
// data.
runSyncUntilStateAck(
configContents,
configAsString,
stream2,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -1104,7 +1106,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 43,
)
runSync(
configContents,
configAsString,
stream3,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -1164,7 +1166,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
configAsString,
makeStream(syncId = 42),
listOf(
InputRecord(
Expand All @@ -1177,7 +1179,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val finalStream = makeStream(syncId = 43)
runSync(
configContents,
configAsString,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -1230,7 +1232,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
configAsString,
makeStream(
syncId = 42,
linkedMapOf("id" to intType, "to_drop" to stringType, "to_change" to intType)
Expand All @@ -1250,7 +1252,7 @@ abstract class BasicFunctionalityIntegrationTest(
linkedMapOf("id" to intType, "to_change" to stringType, "to_add" to stringType)
)
runSync(
configContents,
configAsString,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -1324,7 +1326,7 @@ abstract class BasicFunctionalityIntegrationTest(

val sync1Stream = makeStream(syncId = 42)
runSync(
configContents,
configAsString,
sync1Stream,
listOf(
// emitted_at:1000 is equal to 1970-01-01 00:00:01Z.
Expand Down Expand Up @@ -1385,7 +1387,7 @@ abstract class BasicFunctionalityIntegrationTest(

val sync2Stream = makeStream(syncId = 43)
runSync(
configContents,
configAsString,
sync2Stream,
listOf(
// Update both Alice and Bob
Expand Down Expand Up @@ -1469,9 +1471,9 @@ abstract class BasicFunctionalityIntegrationTest(
// instead of being able to fallback onto extractedAt.
emittedAtMs = 100,
)
runSync(configContents, makeStream("cursor1"), listOf(makeRecord("cursor1")))
runSync(configAsString, makeStream("cursor1"), listOf(makeRecord("cursor1")))
val stream2 = makeStream("cursor2")
runSync(configContents, stream2, listOf(makeRecord("cursor2")))
runSync(configAsString, stream2, listOf(makeRecord("cursor2")))
dumpAndDiffRecords(
parsedConfig,
listOf(
Expand Down Expand Up @@ -1528,7 +1530,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
}
// Just verify that we don't crash.
assertDoesNotThrow { runSync(configContents, DestinationCatalog(streams), messages) }
assertDoesNotThrow { runSync(configAsString, DestinationCatalog(streams), messages) }
}

/**
Expand Down Expand Up @@ -1581,7 +1583,7 @@ abstract class BasicFunctionalityIntegrationTest(
emittedAtMs = 100,
)
runSync(
configContents,
configAsString,
stream,
listOf(
// A record with valid values for all fields
Expand Down Expand Up @@ -1865,7 +1867,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2044,7 +2046,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2215,7 +2217,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2407,7 +2409,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,23 @@ data class MSSQLConfiguration(
class MSSQLConfigurationFactory :
DestinationConfigurationFactory<MSSQLSpecification, MSSQLConfiguration> {
override fun makeWithoutExceptionHandling(pojo: MSSQLSpecification): MSSQLConfiguration {
return makeWithOverrides(spec = pojo)
}

fun makeWithOverrides(
spec: MSSQLSpecification,
overrides: Map<String, String> = emptyMap()
): MSSQLConfiguration {
return MSSQLConfiguration(
host = pojo.host,
port = pojo.port,
database = pojo.database,
schema = pojo.schema,
user = pojo.user,
password = pojo.password,
jdbcUrlParams = pojo.jdbcUrlParams,
rawDataSchema = pojo.rawDataSchema,
sslMethod = pojo.sslMethod,
host = overrides.getOrDefault("host", spec.host),
port = overrides.getOrDefault("port", spec.port.toString()).toInt(),
database = overrides.getOrDefault("database", spec.database),
schema = overrides.getOrDefault("schema", spec.schema),
user = overrides.getOrDefault("user", spec.user),
password = overrides.getOrDefault("password", spec.password),
jdbcUrlParams = overrides.getOrDefault("jdbcUrlParams", spec.jdbcUrlParams),
rawDataSchema = overrides.getOrDefault("rawDataSchema", spec.rawDataSchema),
sslMethod = spec.sslMethod,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import io.airbyte.cdk.load.check.CheckTestConfig
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.TestInstance

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class MSSQLCheckTest :
CheckIntegrationTest<MSSQLSpecification>(
successConfigFilenames =
Expand All @@ -25,16 +23,20 @@ internal class MSSQLCheckTest :
MSSQLTestConfigUtil.getConfigPath("check/fail-internal-schema-invalid.json")
) to "\"iamnotthere\" either does not exist".toPattern(),
),
configUpdater = MSSQLContainerHelper
configUpdater = MSSQLConfigUpdater()
) {

@BeforeAll
fun beforeAll() {
MSSQLContainerHelper.start()
}
companion object {
@JvmStatic
@BeforeAll
fun beforeAll() {
MSSQLContainerHelper.start()
}

@AfterAll
fun afterAll() {
MSSQLContainerHelper.stop()
@JvmStatic
@AfterAll
fun afterAll() {
MSSQLContainerHelper.stop()
}
}
}
Loading

0 comments on commit 28005f5

Please sign in to comment.