diff --git a/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml b/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml index 1252b18dfbd7..433ec6a75fde 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c - dockerImageTag: 0.0.10 + dockerImageTag: 0.0.11 dockerRepository: airbyte/source-mysql-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql-v2 diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt index b4792ee35b48..91e6c46dbb44 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt @@ -1,6 +1,7 @@ /* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ package io.airbyte.integrations.source.mysql +import io.airbyte.cdk.ConfigErrorException import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.check.JdbcCheckQueries import io.airbyte.cdk.command.SourceConfiguration @@ -15,7 +16,9 @@ import io.airbyte.protocol.models.v0.StreamDescriptor import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Primary import jakarta.inject.Singleton +import java.sql.Connection import java.sql.ResultSet +import java.sql.SQLException import java.sql.Statement private val log = KotlinLogging.logger {} @@ -25,6 +28,67 @@ class MysqlSourceMetadataQuerier( val base: JdbcMetadataQuerier, ) : MetadataQuerier by base { + override fun extraChecks() { + base.extraChecks() + if (base.config.global) { + // Extra checks for CDC + var cdcVariableCheckQueries: List> = + listOf( + Pair("show variables where Variable_name = 'log_bin'", "ON"), + Pair("show variables where Variable_name = 'binlog_format'", "ROW"), + Pair("show variables where Variable_name = 'binlog_row_image'", "FULL"), + ) + + cdcVariableCheckQueries.forEach { runVariableCheckSql(it.first, it.second, base.conn) } + + // Note: SHOW MASTER STATUS has been deprecated in latest mysql (8.4) and going forward + // it should be SHOW BINARY LOG STATUS. We will run both - if both have been failed we + // will throw exception. + try { + base.conn.createStatement().use { stmt: Statement -> + stmt.execute("SHOW MASTER STATUS") + } + } catch (e: SQLException) { + try { + base.conn.createStatement().use { stmt: Statement -> + stmt.execute("SHOW BINARY LOG STATUS") + } + } catch (ex: SQLException) { + throw ConfigErrorException( + "Please grant REPLICATION CLIENT privilege, so that binary log files are available for CDC mode." + ) + } + } + } + } + + private fun runVariableCheckSql(sql: String, expectedValue: String, conn: Connection) { + try { + conn.createStatement().use { stmt: Statement -> + stmt.executeQuery(sql).use { rs: ResultSet -> + if (!rs.next()) { + throw ConfigErrorException("Could not query the variable $sql") + } + val resultValue: String = rs.getString("Value") + if (!resultValue.equals(expectedValue, ignoreCase = true)) { + throw ConfigErrorException( + String.format( + "The variable should be set to \"%s\", but it is \"%s\"", + expectedValue, + resultValue, + ), + ) + } + if (rs.next()) { + throw ConfigErrorException("Could not query the variable $sql") + } + } + } + } catch (e: Exception) { + throw ConfigErrorException("Check query failed with: ${e.message}") + } + } + override fun fields(streamID: StreamIdentifier): List { val table: TableName = findTableName(streamID) ?: return listOf() if (table !in base.memoizedColumnMetadata) return listOf() @@ -75,8 +139,8 @@ class MysqlSourceMetadataQuerier( .groupBy { findTableName( StreamIdentifier.from( - StreamDescriptor().withName(it.tableName).withNamespace("public") - ) + StreamDescriptor().withName(it.tableName).withNamespace("public"), + ), ) } .mapNotNull { (table, rowsByTable) -> diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt index 3d1905a3c4c2..a03c73d806e6 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt @@ -16,6 +16,8 @@ import io.airbyte.cdk.jdbc.StringFieldType import io.airbyte.cdk.output.BufferingOutputConsumer import io.airbyte.cdk.util.Jsons import io.airbyte.integrations.source.mysql.MysqlContainerFactory.execAsRoot +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus +import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateMessage import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.CatalogHelpers @@ -26,6 +28,7 @@ import io.airbyte.protocol.models.v0.SyncMode import io.github.oshai.kotlinlogging.KotlinLogging import java.sql.Connection import java.sql.Statement +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout @@ -33,6 +36,49 @@ import org.testcontainers.containers.MySQLContainer class MysqlCdcIntegrationTest { + @Test + fun testCheck() { + val run1: BufferingOutputConsumer = CliRunner.source("check", config(), null).run() + + assertEquals(run1.messages().size, 1) + assertEquals( + run1.messages().first().connectionStatus.status, + AirbyteConnectionStatus.Status.SUCCEEDED + ) + + MysqlContainerFactory.exclusive( + imageName = "mysql:8.0", + MysqlContainerFactory.WithCdcOff, + ) + .use { nonCdcDbContainer -> + { + val invalidConfig: MysqlSourceConfigurationJsonObject = + MysqlContainerFactory.config(nonCdcDbContainer).apply { + setCursorMethodValue(CdcCursor()) + } + + val nonCdcConnectionFactory = + JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(invalidConfig)) + + provisionTestContainer(nonCdcDbContainer, nonCdcConnectionFactory) + + val run2: BufferingOutputConsumer = + CliRunner.source("check", invalidConfig, null).run() + + val messageInRun2 = + run2 + .messages() + .filter { it.type == AirbyteMessage.Type.CONNECTION_STATUS } + .first() + + assertEquals( + AirbyteConnectionStatus.Status.FAILED, + messageInRun2.connectionStatus.status + ) + } + } + } + @Test fun test() { val run1: BufferingOutputConsumer = @@ -99,12 +145,20 @@ class MysqlCdcIntegrationTest { imageName = "mysql:8.0", MysqlContainerFactory.WithNetwork, ) + provisionTestContainer(dbContainer, connectionFactory) + } + + fun provisionTestContainer( + targetContainer: MySQLContainer<*>, + targetConnectionFactory: JdbcConnectionFactory + ) { val grant = "GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT " + - "ON *.* TO '${dbContainer.username}'@'%';" - dbContainer.execAsRoot(grant) - dbContainer.execAsRoot("FLUSH PRIVILEGES;") - connectionFactory.get().use { connection: Connection -> + "ON *.* TO '${targetContainer.username}'@'%';" + targetContainer.execAsRoot(grant) + targetContainer.execAsRoot("FLUSH PRIVILEGES;") + + targetConnectionFactory.get().use { connection: Connection -> connection.isReadOnly = false connection.createStatement().use { stmt: Statement -> stmt.execute("CREATE TABLE test.tbl(k INT PRIMARY KEY, v VARCHAR(80))") diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt b/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt index 6cb03843f5fe..ecd9ff481dfd 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt @@ -25,6 +25,12 @@ object MysqlContainerFactory { } } + data object WithCdcOff : MysqlContainerModifier { + override fun modify(container: MySQLContainer<*>) { + container.withCommand("--skip-log-bin") + } + } + fun exclusive( imageName: String, vararg modifiers: MysqlContainerModifier,