Skip to content

Commit

Permalink
[source-mysql-v2] Add extra check logic for CDC (#45916)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong authored Sep 27, 2024
1 parent b82a05a commit 4beba6c
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c
dockerImageTag: 0.0.10
dockerImageTag: 0.0.11
dockerRepository: airbyte/source-mysql-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql-v2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.check.JdbcCheckQueries
import io.airbyte.cdk.command.SourceConfiguration
Expand All @@ -15,7 +16,9 @@ import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Primary
import jakarta.inject.Singleton
import java.sql.Connection
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement

private val log = KotlinLogging.logger {}
Expand All @@ -25,6 +28,67 @@ class MysqlSourceMetadataQuerier(
val base: JdbcMetadataQuerier,
) : MetadataQuerier by base {

override fun extraChecks() {
base.extraChecks()
if (base.config.global) {
// Extra checks for CDC
var cdcVariableCheckQueries: List<Pair<String, String>> =
listOf(
Pair("show variables where Variable_name = 'log_bin'", "ON"),
Pair("show variables where Variable_name = 'binlog_format'", "ROW"),
Pair("show variables where Variable_name = 'binlog_row_image'", "FULL"),
)

cdcVariableCheckQueries.forEach { runVariableCheckSql(it.first, it.second, base.conn) }

// Note: SHOW MASTER STATUS has been deprecated in latest mysql (8.4) and going forward
// it should be SHOW BINARY LOG STATUS. We will run both - if both have been failed we
// will throw exception.
try {
base.conn.createStatement().use { stmt: Statement ->
stmt.execute("SHOW MASTER STATUS")
}
} catch (e: SQLException) {
try {
base.conn.createStatement().use { stmt: Statement ->
stmt.execute("SHOW BINARY LOG STATUS")
}
} catch (ex: SQLException) {
throw ConfigErrorException(
"Please grant REPLICATION CLIENT privilege, so that binary log files are available for CDC mode."
)
}
}
}
}

private fun runVariableCheckSql(sql: String, expectedValue: String, conn: Connection) {
try {
conn.createStatement().use { stmt: Statement ->
stmt.executeQuery(sql).use { rs: ResultSet ->
if (!rs.next()) {
throw ConfigErrorException("Could not query the variable $sql")
}
val resultValue: String = rs.getString("Value")
if (!resultValue.equals(expectedValue, ignoreCase = true)) {
throw ConfigErrorException(
String.format(
"The variable should be set to \"%s\", but it is \"%s\"",
expectedValue,
resultValue,
),
)
}
if (rs.next()) {
throw ConfigErrorException("Could not query the variable $sql")
}
}
}
} catch (e: Exception) {
throw ConfigErrorException("Check query failed with: ${e.message}")
}
}

override fun fields(streamID: StreamIdentifier): List<Field> {
val table: TableName = findTableName(streamID) ?: return listOf()
if (table !in base.memoizedColumnMetadata) return listOf()
Expand Down Expand Up @@ -75,8 +139,8 @@ class MysqlSourceMetadataQuerier(
.groupBy {
findTableName(
StreamIdentifier.from(
StreamDescriptor().withName(it.tableName).withNamespace("public")
)
StreamDescriptor().withName(it.tableName).withNamespace("public"),
),
)
}
.mapNotNull { (table, rowsByTable) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import io.airbyte.cdk.jdbc.StringFieldType
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.util.Jsons
import io.airbyte.integrations.source.mysql.MysqlContainerFactory.execAsRoot
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.CatalogHelpers
Expand All @@ -26,13 +28,57 @@ import io.airbyte.protocol.models.v0.SyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.sql.Connection
import java.sql.Statement
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Timeout
import org.testcontainers.containers.MySQLContainer

class MysqlCdcIntegrationTest {

@Test
fun testCheck() {
val run1: BufferingOutputConsumer = CliRunner.source("check", config(), null).run()

assertEquals(run1.messages().size, 1)
assertEquals(
run1.messages().first().connectionStatus.status,
AirbyteConnectionStatus.Status.SUCCEEDED
)

MysqlContainerFactory.exclusive(
imageName = "mysql:8.0",
MysqlContainerFactory.WithCdcOff,
)
.use { nonCdcDbContainer ->
{
val invalidConfig: MysqlSourceConfigurationJsonObject =
MysqlContainerFactory.config(nonCdcDbContainer).apply {
setCursorMethodValue(CdcCursor())
}

val nonCdcConnectionFactory =
JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(invalidConfig))

provisionTestContainer(nonCdcDbContainer, nonCdcConnectionFactory)

val run2: BufferingOutputConsumer =
CliRunner.source("check", invalidConfig, null).run()

val messageInRun2 =
run2
.messages()
.filter { it.type == AirbyteMessage.Type.CONNECTION_STATUS }
.first()

assertEquals(
AirbyteConnectionStatus.Status.FAILED,
messageInRun2.connectionStatus.status
)
}
}
}

@Test
fun test() {
val run1: BufferingOutputConsumer =
Expand Down Expand Up @@ -99,12 +145,20 @@ class MysqlCdcIntegrationTest {
imageName = "mysql:8.0",
MysqlContainerFactory.WithNetwork,
)
provisionTestContainer(dbContainer, connectionFactory)
}

fun provisionTestContainer(
targetContainer: MySQLContainer<*>,
targetConnectionFactory: JdbcConnectionFactory
) {
val grant =
"GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT " +
"ON *.* TO '${dbContainer.username}'@'%';"
dbContainer.execAsRoot(grant)
dbContainer.execAsRoot("FLUSH PRIVILEGES;")
connectionFactory.get().use { connection: Connection ->
"ON *.* TO '${targetContainer.username}'@'%';"
targetContainer.execAsRoot(grant)
targetContainer.execAsRoot("FLUSH PRIVILEGES;")

targetConnectionFactory.get().use { connection: Connection ->
connection.isReadOnly = false
connection.createStatement().use { stmt: Statement ->
stmt.execute("CREATE TABLE test.tbl(k INT PRIMARY KEY, v VARCHAR(80))")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ object MysqlContainerFactory {
}
}

data object WithCdcOff : MysqlContainerModifier {
override fun modify(container: MySQLContainer<*>) {
container.withCommand("--skip-log-bin")
}
}

fun exclusive(
imageName: String,
vararg modifiers: MysqlContainerModifier,
Expand Down

0 comments on commit 4beba6c

Please sign in to comment.