Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jan 17, 2025
1 parent 615f2a7 commit 46c0e54
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ import java.time.OffsetDateTime
import java.time.OffsetTime
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.Assertions.assertDoesNotThrow
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
Expand Down Expand Up @@ -148,10 +147,9 @@ abstract class BasicFunctionalityIntegrationTest(

@Test
open fun testBasicWrite() {
val streamName = generateStreamName()
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(linkedMapOf("id" to intType)),
generationId = 0,
Expand All @@ -165,7 +163,7 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(
InputRecord(
namespace = randomizedNamespace,
name = streamName,
name = "test_stream",
data = """{"id": 5678, "undeclared": "asdf"}""",
emittedAtMs = 1234,
changes =
Expand All @@ -180,7 +178,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
),
InputStreamCheckpoint(
streamName = streamName,
streamName = "test_stream",
streamNamespace = randomizedNamespace,
blob = """{"foo": "bar"}""",
sourceRecordCount = 1,
Expand All @@ -198,7 +196,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
assertEquals(
StreamCheckpoint(
streamName = streamName,
streamName = "test_stream",
streamNamespace = randomizedNamespace,
blob = """{"foo": "bar"}""",
sourceRecordCount = 1,
Expand Down Expand Up @@ -320,10 +318,9 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testMidSyncCheckpointingStreamState(): Unit =
runBlocking(Dispatchers.IO) {
val streamName = generateStreamName()
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(linkedMapOf("id" to intType)),
generationId = 0,
Expand All @@ -337,22 +334,22 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(
InputRecord(
namespace = randomizedNamespace,
name = streamName,
name = "test_stream",
data = """{"id": 12}""",
emittedAtMs = 1234,
)
),
StreamCheckpoint(
streamNamespace = randomizedNamespace,
streamName = streamName,
streamName = "test_stream",
blob = """{"foo": "bar1"}""",
sourceRecordCount = 1
),
allowGracefulShutdown = false,
)
runSync(this@BasicFunctionalityIntegrationTest.configContents, stream, emptyList())

val actualStreamName = stateMessage.stream.streamDescriptor.name
val streamName = stateMessage.stream.streamDescriptor.name
val streamNamespace = stateMessage.stream.streamDescriptor.namespace
// basic state message checks - this is mostly just exercising the CDK itself,
// but is cheap and easy to do.
Expand All @@ -361,8 +358,8 @@ abstract class BasicFunctionalityIntegrationTest(
{
assertEquals(
streamName,
actualStreamName,
"Expected stream name to be $streamName, got $actualStreamName"
"test_stream",
"Expected stream name to be test_stream, got $streamName"
)
},
{
Expand Down Expand Up @@ -401,10 +398,9 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testNamespaces() {
assumeTrue(verifyDataWriting)
val streamName = generateStreamName()
fun makeStream(namespace: String) =
DestinationStream(
DestinationStream.Descriptor(namespace, streamName),
DestinationStream.Descriptor(namespace, "test_stream"),
Append,
ObjectType(linkedMapOf("id" to intType)),
generationId = 0,
Expand Down Expand Up @@ -567,10 +563,9 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testTruncateRefresh() {
assumeTrue(verifyDataWriting)
val streamName = generateStreamName()
fun makeStream(generationId: Long, minimumGenerationId: Long, syncId: Long) =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(linkedMapOf("id" to intType, "name" to stringType)),
generationId,
Expand All @@ -583,7 +578,7 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
"""{"id": 42, "name": "first_value"}""",
emittedAtMs = 1234L,
)
Expand All @@ -596,7 +591,7 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
"""{"id": 42, "name": "second_value"}""",
emittedAtMs = 1234,
)
Expand Down Expand Up @@ -631,11 +626,10 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testInterruptedTruncateWithPriorData() {
assumeTrue(verifyDataWriting)
val streamName = generateStreamName()
fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) =
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
"""{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""",
emittedAtMs = extractedAt,
)
Expand All @@ -660,7 +654,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a normal sync with nonempty data
val stream1 =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(
linkedMapOf(
Expand Down Expand Up @@ -801,11 +795,10 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testInterruptedTruncateWithoutPriorData() {
assumeTrue(verifyDataWriting)
val streamName = generateStreamName()
fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) =
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
"""{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""",
emittedAtMs = extractedAt,
)
Expand All @@ -829,7 +822,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(
linkedMapOf(
Expand Down Expand Up @@ -923,11 +916,10 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun resumeAfterCancelledTruncate() {
assumeTrue(verifyDataWriting)
val streamName = generateStreamName()
fun makeInputRecord(id: Int, updatedAt: String, extractedAt: Long) =
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
"""{"id": $id, "updated_at": "$updatedAt", "name": "foo_${id}_$extractedAt"}""",
emittedAtMs = extractedAt,
)
Expand All @@ -952,7 +944,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a normal sync with nonempty data
val stream1 =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(
linkedMapOf(
Expand Down Expand Up @@ -1111,10 +1103,9 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testAppend() {
assumeTrue(verifyDataWriting)
val streamName = generateStreamName()
fun makeStream(syncId: Long) =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(linkedMapOf("id" to intType, "name" to stringType)),
generationId = 0,
Expand All @@ -1127,7 +1118,7 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
"""{"id": 42, "name": "first_value"}""",
emittedAtMs = 1234L,
)
Expand All @@ -1140,7 +1131,7 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
"""{"id": 42, "name": "second_value"}""",
emittedAtMs = 5678L,
)
Expand Down Expand Up @@ -1178,10 +1169,9 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testAppendSchemaEvolution() {
assumeTrue(verifyDataWriting)
val streamName = generateStreamName()
fun makeStream(syncId: Long, schema: LinkedHashMap<String, FieldType>) =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(schema),
generationId = 0,
Expand All @@ -1197,7 +1187,7 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
"""{"id": 42, "to_drop": "val1", "to_change": 42}""",
emittedAtMs = 1234L,
)
Expand All @@ -1214,7 +1204,7 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
"""{"id": 42, "to_change": "val2", "to_add": "val3"}""",
emittedAtMs = 1234,
)
Expand Down Expand Up @@ -1250,10 +1240,9 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testDedup() {
assumeTrue(supportsDedup)
val streamName = generateStreamName()
fun makeStream(syncId: Long) =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
importType =
Dedupe(
primaryKey = listOf(listOf("id1"), listOf("id2")),
Expand All @@ -1277,7 +1266,7 @@ abstract class BasicFunctionalityIntegrationTest(
fun makeRecord(data: String, extractedAt: Long) =
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
data,
emittedAtMs = extractedAt,
)
Expand Down Expand Up @@ -1400,10 +1389,9 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testDedupChangeCursor() {
assumeTrue(verifyDataWriting && supportsDedup)
val streamName = generateStreamName()
fun makeStream(cursor: String) =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Dedupe(
primaryKey = listOf(listOf("id")),
cursor = listOf(cursor),
Expand All @@ -1423,7 +1411,7 @@ abstract class BasicFunctionalityIntegrationTest(
fun makeRecord(cursorName: String) =
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
data = """{"id": 1, "$cursorName": 1, "name": "foo_$cursorName"}""",
// this is unrealistic (extractedAt should always increase between syncs),
// but it lets us force the dedupe behavior to rely solely on the cursor column,
Expand Down Expand Up @@ -1503,10 +1491,9 @@ abstract class BasicFunctionalityIntegrationTest(
@Test
open fun testBasicTypes() {
assumeTrue(verifyDataWriting)
val streamName = generateStreamName()
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(
linkedMapOf(
Expand Down Expand Up @@ -1538,7 +1525,7 @@ abstract class BasicFunctionalityIntegrationTest(
fun makeRecord(data: String) =
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
data,
emittedAtMs = 100,
)
Expand Down Expand Up @@ -2304,10 +2291,9 @@ abstract class BasicFunctionalityIntegrationTest(
* happens sometimes.
*/
open fun testNoColumns() {
val streamName = generateStreamName()
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, streamName),
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(linkedMapOf()),
generationId = 42,
Expand All @@ -2320,7 +2306,7 @@ abstract class BasicFunctionalityIntegrationTest(
listOf(
InputRecord(
randomizedNamespace,
streamName,
"test_stream",
"""{"foo": "bar"}""",
emittedAtMs = 1000L,
)
Expand All @@ -2347,11 +2333,6 @@ abstract class BasicFunctionalityIntegrationTest(
)
}

@Suppress("DEPRECATION")
private fun generateStreamName(): String {
return "test_stream_${RandomStringUtils.randomAlphabetic(4)}"
}

companion object {
private val intType = FieldType(IntegerType, nullable = true)
private val numberType = FieldType(NumberType, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import io.airbyte.cdk.load.check.CheckTestConfig
import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.TestInstance

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class MSSQLCheckTest :
CheckIntegrationTest<MSSQLSpecification>(
successConfigFilenames =
Expand All @@ -28,13 +26,17 @@ internal class MSSQLCheckTest :
configUpdater = MSSQLConfigUpdater()
) {

@BeforeAll
fun beforeAll() {
MSSQLContainerHelper.start()
}
companion object {
@JvmStatic
@BeforeAll
fun beforeAll() {
MSSQLContainerHelper.start()
}

@AfterAll
fun afterAll() {
MSSQLContainerHelper.stop()
@JvmStatic
@AfterAll
fun afterAll() {
MSSQLContainerHelper.stop()
}
}
}
Loading

0 comments on commit 46c0e54

Please sign in to comment.