Skip to content

Commit

Permalink
fix: use testcontainer for write tests (#51594)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored Jan 17, 2025
1 parent f28f532 commit f5becbb
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,10 @@ abstract class BasicFunctionalityIntegrationTest(
configUpdater = configUpdater,
envVars = envVars,
) {
val parsedConfig =
ValidatedJsonUtils.parseOne(configSpecClass, configUpdater.update(configContents))

// Update config with any replacements. This may be necessary when using testcontainers.
val configAsString = configUpdater.update(configContents)
val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configAsString)

@Test
open fun testBasicWrite() {
Expand All @@ -156,7 +158,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val messages =
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -207,7 +209,7 @@ abstract class BasicFunctionalityIntegrationTest(
{
if (verifyDataWriting) {
dumpAndDiffRecords(
ValidatedJsonUtils.parseOne(configSpecClass, configContents),
ValidatedJsonUtils.parseOne(configSpecClass, configAsString),
listOf(
OutputRecord(
extractedAt = 1234,
Expand Down Expand Up @@ -268,7 +270,7 @@ abstract class BasicFunctionalityIntegrationTest(

val messages =
runSync(
configContents,
configAsString,
stream,
listOf(
InputFile(
Expand Down Expand Up @@ -306,7 +308,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
})

val config = ValidatedJsonUtils.parseOne(configSpecClass, configContents)
val config = ValidatedJsonUtils.parseOne(configSpecClass, configAsString)
val fileContent = dataDumper.dumpFile(config, stream)

assertEquals(listOf("123"), fileContent)
Expand All @@ -327,7 +329,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val stateMessage =
runSyncUntilStateAck(
configContents,
this@BasicFunctionalityIntegrationTest.configContents,
stream,
listOf(
InputRecord(
Expand All @@ -345,7 +347,7 @@ abstract class BasicFunctionalityIntegrationTest(
),
allowGracefulShutdown = false,
)
runSync(configContents, stream, emptyList())
runSync(this@BasicFunctionalityIntegrationTest.configContents, stream, emptyList())

val streamName = stateMessage.stream.streamDescriptor.name
val streamNamespace = stateMessage.stream.streamDescriptor.namespace
Expand Down Expand Up @@ -408,7 +410,7 @@ abstract class BasicFunctionalityIntegrationTest(
val stream1 = makeStream(randomizedNamespace + "_1")
val stream2 = makeStream(randomizedNamespace + "_2")
runSync(
configContents,
configAsString,
DestinationCatalog(
listOf(
stream1,
Expand Down Expand Up @@ -531,7 +533,7 @@ abstract class BasicFunctionalityIntegrationTest(
serialized = "",
)
}
runSync(configContents, catalog, messages)
runSync(configAsString, catalog, messages)
assertAll(
catalog.streams.map { stream ->
{
Expand Down Expand Up @@ -571,7 +573,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
configAsString,
makeStream(generationId = 12, minimumGenerationId = 0, syncId = 42),
listOf(
InputRecord(
Expand All @@ -584,7 +586,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val finalStream = makeStream(generationId = 13, minimumGenerationId = 13, syncId = 43)
runSync(
configContents,
configAsString,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -666,7 +668,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 41,
)
runSync(
configContents,
configAsString,
stream1,
listOf(
makeInputRecord(1, "2024-01-23T01:00:00Z", 100),
Expand Down Expand Up @@ -705,7 +707,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
// Run a sync, but emit a status incomplete. This should not delete any existing data.
runSyncUntilStateAck(
configContents,
configAsString,
stream2,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -754,7 +756,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a third sync, this time with a successful status.
// This should delete the first sync's data, and retain the second+third syncs' data.
runSync(
configContents,
configAsString,
stream2,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -835,7 +837,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
// Run a sync, but emit a stream status incomplete.
runSyncUntilStateAck(
configContents,
configAsString,
stream,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -870,7 +872,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a second sync, this time with a successful status.
// This should retain the first syncs' data.
runSync(
configContents,
configAsString,
stream,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -956,7 +958,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 41,
)
runSync(
configContents,
configAsString,
stream1,
listOf(
makeInputRecord(1, "2024-01-23T01:00:00Z", 100),
Expand Down Expand Up @@ -996,7 +998,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a sync, but emit a stream status incomplete. This should not delete any existing
// data.
runSyncUntilStateAck(
configContents,
configAsString,
stream2,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -1051,7 +1053,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 43,
)
runSync(
configContents,
configAsString,
stream3,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -1111,7 +1113,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
configAsString,
makeStream(syncId = 42),
listOf(
InputRecord(
Expand All @@ -1124,7 +1126,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val finalStream = makeStream(syncId = 43)
runSync(
configContents,
configAsString,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -1177,7 +1179,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
configAsString,
makeStream(
syncId = 42,
linkedMapOf("id" to intType, "to_drop" to stringType, "to_change" to intType)
Expand All @@ -1197,7 +1199,7 @@ abstract class BasicFunctionalityIntegrationTest(
linkedMapOf("id" to intType, "to_change" to stringType, "to_add" to stringType)
)
runSync(
configContents,
configAsString,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -1271,7 +1273,7 @@ abstract class BasicFunctionalityIntegrationTest(

val sync1Stream = makeStream(syncId = 42)
runSync(
configContents,
configAsString,
sync1Stream,
listOf(
// emitted_at:1000 is equal to 1970-01-01 00:00:01Z.
Expand Down Expand Up @@ -1332,7 +1334,7 @@ abstract class BasicFunctionalityIntegrationTest(

val sync2Stream = makeStream(syncId = 43)
runSync(
configContents,
configAsString,
sync2Stream,
listOf(
// Update both Alice and Bob
Expand Down Expand Up @@ -1416,9 +1418,9 @@ abstract class BasicFunctionalityIntegrationTest(
// instead of being able to fallback onto extractedAt.
emittedAtMs = 100,
)
runSync(configContents, makeStream("cursor1"), listOf(makeRecord("cursor1")))
runSync(configAsString, makeStream("cursor1"), listOf(makeRecord("cursor1")))
val stream2 = makeStream("cursor2")
runSync(configContents, stream2, listOf(makeRecord("cursor2")))
runSync(configAsString, stream2, listOf(makeRecord("cursor2")))
dumpAndDiffRecords(
parsedConfig,
listOf(
Expand Down Expand Up @@ -1475,7 +1477,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
}
// Just verify that we don't crash.
assertDoesNotThrow { runSync(configContents, DestinationCatalog(streams), messages) }
assertDoesNotThrow { runSync(configAsString, DestinationCatalog(streams), messages) }
}

/**
Expand Down Expand Up @@ -1528,7 +1530,7 @@ abstract class BasicFunctionalityIntegrationTest(
emittedAtMs = 100,
)
runSync(
configContents,
configAsString,
stream,
listOf(
// A record with valid values for all fields
Expand Down Expand Up @@ -1812,7 +1814,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -1975,7 +1977,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2146,7 +2148,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2299,7 +2301,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
configAsString,
stream,
listOf(
InputRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,23 @@ data class MSSQLConfiguration(
class MSSQLConfigurationFactory :
DestinationConfigurationFactory<MSSQLSpecification, MSSQLConfiguration> {
override fun makeWithoutExceptionHandling(pojo: MSSQLSpecification): MSSQLConfiguration {
return makeWithOverrides(spec = pojo)
}

fun makeWithOverrides(
spec: MSSQLSpecification,
overrides: Map<String, String> = emptyMap()
): MSSQLConfiguration {
return MSSQLConfiguration(
host = pojo.host,
port = pojo.port,
database = pojo.database,
schema = pojo.schema,
user = pojo.user,
password = pojo.password,
jdbcUrlParams = pojo.jdbcUrlParams,
rawDataSchema = pojo.rawDataSchema,
sslMethod = pojo.sslMethod,
host = overrides.getOrDefault("host", spec.host),
port = overrides.getOrDefault("port", spec.port.toString()).toInt(),
database = overrides.getOrDefault("database", spec.database),
schema = overrides.getOrDefault("schema", spec.schema),
user = overrides.getOrDefault("user", spec.user),
password = overrides.getOrDefault("password", spec.password),
jdbcUrlParams = overrides.getOrDefault("jdbcUrlParams", spec.jdbcUrlParams),
rawDataSchema = overrides.getOrDefault("rawDataSchema", spec.rawDataSchema),
sslMethod = spec.sslMethod,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import io.airbyte.cdk.load.check.CheckTestConfig
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.TestInstance

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class MSSQLCheckTest :
CheckIntegrationTest<MSSQLSpecification>(
successConfigFilenames =
Expand All @@ -25,16 +23,20 @@ internal class MSSQLCheckTest :
MSSQLTestConfigUtil.getConfigPath("check/fail-internal-schema-invalid.json")
) to "\"iamnotthere\" either does not exist".toPattern(),
),
configUpdater = MSSQLContainerHelper
configUpdater = MSSQLConfigUpdater()
) {

@BeforeAll
fun beforeAll() {
MSSQLContainerHelper.start()
}
companion object {
@JvmStatic
@BeforeAll
fun beforeAll() {
MSSQLContainerHelper.start()
}

@AfterAll
fun afterAll() {
MSSQLContainerHelper.stop()
@JvmStatic
@AfterAll
fun afterAll() {
MSSQLContainerHelper.stop()
}
}
}
Loading

0 comments on commit f5becbb

Please sign in to comment.