diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index 87862c712f1d..d471e2a7c959 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.3.5 + dockerImageTag: 3.3.7 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcProperties.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcProperties.java index 5aa9f208d13a..745f92cfceaa 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcProperties.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcProperties.java @@ -68,6 +68,11 @@ private static Properties commonProperties(final JdbcDatabase database) { : HEARTBEAT_INTERVAL; props.setProperty("heartbeat.interval.ms", Long.toString(heartbeatInterval.toMillis())); + if (sourceConfig.get("replication_method").has("heartbeat_action_query") + && !sourceConfig.get("replication_method").get("heartbeat_action_query").asText().isEmpty()) { + props.setProperty("heartbeat.action.query", sourceConfig.get("replication_method").get("heartbeat_action_query").asText()); + } + if (PostgresUtils.shouldFlushAfterSync(sourceConfig)) { props.setProperty("flush.lsn.source", "false"); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json index f00b42a5507a..874b284be01d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json @@ -289,6 +289,13 @@ ], "default": "After loading Data in the destination", "order": 7 + }, + "heartbeat_action_query": { + "type": "string", + "title": "Debezium heartbeat query (Advanced)", + "description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the setup guide for how and when to configure this setting.", + "default": "", + "order": 8 } } }, diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_cloud_deployment_spec.json b/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_cloud_deployment_spec.json index 375ea5024c99..96fe095dbc9e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_cloud_deployment_spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_cloud_deployment_spec.json @@ -290,6 +290,13 @@ ], "default": "After loading Data in the destination", "order": 7 + }, + "heartbeat_action_query": { + "type": "string", + "title": "Debezium heartbeat query (Advanced)", + "description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the setup guide for how and when to configure this setting.", + "default": "", + "order": 8 } } }, diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json b/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json index 8b09d54fd1b6..febf87168ec9 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json @@ -289,6 +289,13 @@ ], "default": "After loading Data in the destination", "order": 7 + }, + "heartbeat_action_query": { + "type": "string", + "title": "Debezium heartbeat query (Advanced)", + "description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the setup guide for how and when to configure this setting.", + "default": "", + "order": 8 } } }, diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 2a2b2ece3056..490cd4f3e400 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -90,6 +90,7 @@ protected JsonNode config() { .withoutSsl() .withCdcReplication("After loading Data in the destination") .with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1) + .with("heartbeat_action_query", "") .build(); } diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 8f514c06e0a5..6c354286563c 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.7 | 2024-02-08 | [34781](https://github.com/airbytehq/airbyte/pull/34781) | Add a setting in the setup page to advance the LSN. | | 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 | | 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 | | 3.3.4 | 2024-01-31 | [34723](https://github.com/airbytehq/airbyte/pull/34723) | Adopt CDK v0.16.3 | diff --git a/docs/integrations/sources/postgres/postgres-troubleshooting.md b/docs/integrations/sources/postgres/postgres-troubleshooting.md index 556118a6c1d1..721c4b694269 100644 --- a/docs/integrations/sources/postgres/postgres-troubleshooting.md +++ b/docs/integrations/sources/postgres/postgres-troubleshooting.md @@ -110,4 +110,14 @@ The connector waits for the default initial wait time of 5 minutes (300 seconds) If you know there are database changes to be synced, but the connector cannot read those changes, the root cause may be insufficient waiting time. In that case, you can increase the waiting time (example: set to 600 seconds) to test if it is indeed the root cause. On the other hand, if you know there are no database changes, you can decrease the wait time to speed up the zero record syncs. +### (Advanced) WAL disk consumption and heartbeat action query +In certain situations, WAL disk consumption increases. This can occur when there are a large volume of changes, but only a small percentage of them are being made to the databases, schemas and tables configured for capture. + +A workaround for this situation is to artificially add events to a heartbeat table that the Airbyte use has write access to. This will ensure that Airbyte can process the WAL and prevent disk space to spike. To configure this: +1. Create a table (e.g. `airbyte_heartbeat`) in the database and schema being tracked. +2. Add this table to the airbyte publication. +3. Configure the `heartbeat_action_query` property while setting up the source-postgres connector. This query will be periodically executed by Airbyte on the `airbyte_heartbeat` table. For example, this param can be set to a query like `INSERT INTO airbyte_heartbeat (text) VALUES ('heartbeat')`. + + +See detailed documentation [here](https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-wal-disk-space).