Skip to content

Commit

Permalink
Merge branch 'master' into btkcodedev/googlePageSpeedInsightsCompatable
Browse files Browse the repository at this point in the history
  • Loading branch information
btkcodedev authored May 15, 2024
2 parents 78516da + 1c3a6c4 commit db72ff7
Show file tree
Hide file tree
Showing 401 changed files with 23,670 additions and 8,609 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
/airbyte-integrations/connectors/destination-milvus @airbytehq/ai-language-models
/airbyte-integrations/connectors/destination-qdrant @airbytehq/ai-language-models
/airbyte-integrations/connectors/destination-chroma @airbytehq/ai-language-models
/airbyte-integrations/connectors/destination-snowflake-cortex @airbytehq/ai-language-models
/airbyte-cdk/python/airbyte_cdk/destinations/vector_db_based @airbytehq/ai-language-models

# CI/CD
Expand Down
32 changes: 32 additions & 0 deletions .github/workflows/auto_merge.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Auto merge connector PRs Cron

on:
schedule:
# 0AM UTC is 2AM CEST, 3AM EEST, 5PM PDT.
- cron: "0 0 * * *"
workflow_dispatch:
jobs:
run_auto_merge:
name: Run auto-merge
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install and configure Poetry
uses: snok/install-poetry@v1
- name: Run auto merge
shell: bash
working-directory: airbyte-ci/connectors/auto_merge
env:
# We need a custom Github Token as some API endpoints
# are not available from GHA auto generated tokens
# like the one to list branch protection rules...
GITHUB_TOKEN: ${{ secrets.AUTO_MERGE_GITHUB_TOKEN }}
AUTO_MERGE_PRODUCTION: ${{ vars.ENABLE_CONNECTOR_AUTO_MERGE }}
run: |
poetry install
poetry run auto-merge
4 changes: 3 additions & 1 deletion .github/workflows/publish-cdk-command-manually.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "master" # TODO: figure out why changing this yells with `The ci_gcs_credentials was not set on this PipelineContext.`
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
# Disable the dagger_cloud_token to disable remote cache access.
# See https://github.com/airbytehq/airbyte-internal-issues/issues/6439#issuecomment-2109503985 for context
#dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
5 changes: 4 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages |
| 0.35.0 | 2024-05-13 | [\#38127](https://github.com/airbytehq/airbyte/pull/38127) | Destinations: Populate generation/sync ID on StreamConfig |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.exceptions.ConnectionErrorException
import io.airbyte.commons.exceptions.TransientErrorException
import io.airbyte.commons.functional.Either
import java.io.EOFException
import java.sql.SQLException
import java.sql.SQLSyntaxErrorException
import java.util.stream.Collectors
Expand All @@ -25,6 +26,8 @@ object ConnectorExceptionUtil {
const val RECOVERY_CONNECTION_ERROR_MESSAGE: String =
"We're having issues syncing from a Postgres replica that is configured as a hot standby server. " +
"Please see https://go.airbyte.com/pg-hot-standby-error-message for options and workarounds"
const val DATABASE_CONNECTION_ERROR: String =
"Encountered an error while connecting to the database error"

@JvmField val HTTP_AUTHENTICATION_ERROR_CODES: List<Int> = ImmutableList.of(401, 403)

Expand All @@ -35,7 +38,10 @@ object ConnectorExceptionUtil {
}

fun isTransientError(e: Throwable?): Boolean {
return isTransientErrorException(e) || isRecoveryConnectionException(e)
return isTransientErrorException(e) ||
isRecoveryConnectionException(e) ||
isTransientEOFException(e) ||
isTransientSQLException(e)
}

fun getDisplayMessage(e: Throwable?): String? {
Expand All @@ -49,6 +55,8 @@ object ConnectorExceptionUtil {
RECOVERY_CONNECTION_ERROR_MESSAGE
} else if (isUnknownColumnInFieldListException(e)) {
e!!.message
} else if (isTransientError(e)) {
DATABASE_CONNECTION_ERROR
} else {
String.format(
COMMON_EXCEPTION_MESSAGE_TEMPLATE,
Expand Down Expand Up @@ -137,6 +145,16 @@ object ConnectorExceptionUtil {
return e is ConnectionErrorException
}

private fun isTransientEOFException(e: Throwable?): Boolean {
return (e is EOFException) &&
e.message!!.lowercase().contains("connection was unexpectedly lost")
}

private fun isTransientSQLException(e: Throwable?): Boolean {
return (e is SQLException) &&
e.message!!.lowercase().contains("An I/O error occurred while sending to the backend")
}

private fun isRecoveryConnectionException(e: Throwable?): Boolean {
return e is SQLException &&
e.message!!.lowercase().contains("due to conflict with recovery")
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.34.4
version=0.35.2
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
// Do nothing.
}

private fun assertStateDoNotHaveDuplicateStreams(stateMessage: AirbyteStateMessage) {
val dedupedStreamStates =
stateMessage.global.streamStates
.stream()
.map { streamState: AirbyteStreamState -> streamState.streamDescriptor }
.collect(Collectors.toSet())
Assertions.assertEquals(dedupedStreamStates.size, stateMessage.global.streamStates.size)
}

@BeforeEach
protected open fun setup() {
testdb = createTestDatabase()
Expand Down Expand Up @@ -616,6 +625,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {

val recordMessages1 = extractRecordMessages(actualRecords1)
val stateMessages1 = extractStateMessages(actualRecords1)
stateMessages1.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
val names = HashSet(STREAM_NAMES)
names.add(MODELS_STREAM_NAME_2)

Expand Down Expand Up @@ -657,7 +667,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
} else {
assertExpectedStateMessageCountMatches(
stateMessages1,
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
)
assertExpectedRecords(
Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
Expand Down Expand Up @@ -1236,8 +1246,9 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {

assertExpectedStateMessageCountMatches(
stateAfterFirstBatch,
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
)
stateAfterFirstBatch.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
}

protected open fun assertStateMessagesForNewTableSnapshotTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ constructor(
.substring(0, 3)
val newName = "${originalName}_$hash"
actualStreamConfig =
StreamConfig(
sqlGenerator.buildStreamId(originalNamespace, newName, rawNamespace),
originalStreamConfig.syncMode,
originalStreamConfig.destinationSyncMode,
originalStreamConfig.primaryKey,
originalStreamConfig.cursor,
originalStreamConfig.columns,
originalStreamConfig.copy(
id =
sqlGenerator.buildStreamId(
originalNamespace,
newName,
rawNamespace,
),
)
} else {
actualStreamConfig = originalStreamConfig
Expand Down Expand Up @@ -112,6 +112,18 @@ constructor(

@VisibleForTesting
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
if (stream.generationId == null) {
stream.generationId = 0
stream.minimumGenerationId = 0
stream.syncId = 0
}
if (
stream.minimumGenerationId != 0.toLong() &&
stream.minimumGenerationId != stream.generationId
) {
throw UnsupportedOperationException("Hybrid refreshes are not yet supported.")
}

val airbyteColumns =
when (
val schema: AirbyteType =
Expand Down Expand Up @@ -143,11 +155,13 @@ constructor(

return StreamConfig(
sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace),
stream.syncMode,
stream.destinationSyncMode,
primaryKey,
cursor,
columns
columns,
stream.generationId,
stream.minimumGenerationId,
stream.syncId,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
package io.airbyte.integrations.base.destination.typing_deduping

import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.SyncMode
import java.util.*
import kotlin.collections.LinkedHashMap

data class StreamConfig(
val id: StreamId,
val syncMode: SyncMode,
val destinationSyncMode: DestinationSyncMode,
val primaryKey: List<ColumnId>,
val cursor: Optional<ColumnId>,
val columns: LinkedHashMap<ColumnId, AirbyteType>,
val generationId: Long,
val minimumGenerationId: Long,
val syncId: Long,
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.SyncMode
import java.util.List
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.BeforeEach
Expand Down Expand Up @@ -74,7 +73,7 @@ internal class CatalogParserTest {
}
val catalog =
ConfiguredAirbyteCatalog()
.withStreams(List.of(stream("a", "foobarfoo"), stream("a", "foofoo")))
.withStreams(listOf(stream("a", "foobarfoo"), stream("a", "foofoo")))

val parsedCatalog = parser.parseCatalog(catalog)

Expand Down Expand Up @@ -127,13 +126,13 @@ internal class CatalogParserTest {
""".trimIndent()
)
val catalog = ConfiguredAirbyteCatalog().withStreams(List.of(stream("a", "a", schema)))
val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema)))

val parsedCatalog = parser.parseCatalog(catalog)
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()
val columnsList = parsedCatalog.streams[0].columns.keys.toList()

assertAll(
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns.size) },
{ Assertions.assertEquals("foofoo", columnsList[0].name) },
{ Assertions.assertEquals("foofoo_1", columnsList[1].name) }
)
Expand Down Expand Up @@ -168,10 +167,10 @@ internal class CatalogParserTest {
val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema)))

val parsedCatalog = parser.parseCatalog(catalog)
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()
val columnsList = parsedCatalog.streams[0].columns.keys.toList()

assertAll(
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns.size) },
{ Assertions.assertEquals("aVeryLongC", columnsList[0].name) },
{ Assertions.assertEquals("aV36rd", columnsList[1].name) }
)
Expand Down Expand Up @@ -200,6 +199,9 @@ internal class CatalogParserTest {
)
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withGenerationId(0)
.withMinimumGenerationId(0)
.withSyncId(0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -913,11 +913,13 @@ class DefaultTyperDeduperTest {
"overwrite_ns",
"overwrite_stream"
),
mock(),
DestinationSyncMode.OVERWRITE,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
private val APPEND_STREAM_CONFIG =
StreamConfig(
Expand All @@ -929,11 +931,13 @@ class DefaultTyperDeduperTest {
"append_ns",
"append_stream"
),
mock(),
DestinationSyncMode.APPEND,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
private val DEDUPE_STREAM_CONFIG =
StreamConfig(
Expand All @@ -945,11 +949,13 @@ class DefaultTyperDeduperTest {
"dedup_ns",
"dedup_stream"
),
mock(),
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DestinationV1V2MigratorTest {
migrator: BaseDestinationV1V2Migrator<*>,
expected: Boolean
) {
val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock())
val config = StreamConfig(STREAM_ID, destinationSyncMode, mock(), mock(), mock(), 0, 0, 0)
val actual = migrator.shouldMigrate(config)
Assertions.assertEquals(expected, actual)
}
Expand All @@ -88,11 +88,13 @@ class DestinationV1V2MigratorTest {
val config =
StreamConfig(
STREAM_ID,
mock(),
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
val migrator = makeMockMigrator(true, true, false, false, false)
val exception =
Expand All @@ -112,11 +114,13 @@ class DestinationV1V2MigratorTest {
val stream =
StreamConfig(
STREAM_ID,
mock(),
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
val handler = Mockito.mock(DestinationHandler::class.java)
val sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table")
Expand Down
Loading

0 comments on commit db72ff7

Please sign in to comment.