Skip to content

Commit

Permalink
[source-postgres] : Provide option to advance LSN (#34781)
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Feb 8, 2024
1 parent c32c2f2 commit e6fff38
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.5
dockerImageTag: 3.3.7
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ private static Properties commonProperties(final JdbcDatabase database) {
: HEARTBEAT_INTERVAL;
props.setProperty("heartbeat.interval.ms", Long.toString(heartbeatInterval.toMillis()));

if (sourceConfig.get("replication_method").has("heartbeat_action_query")
&& !sourceConfig.get("replication_method").get("heartbeat_action_query").asText().isEmpty()) {
props.setProperty("heartbeat.action.query", sourceConfig.get("replication_method").get("heartbeat_action_query").asText());
}

if (PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
props.setProperty("flush.lsn.source", "false");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@
],
"default": "After loading Data in the destination",
"order": 7
},
"heartbeat_action_query": {
"type": "string",
"title": "Debezium heartbeat query (Advanced)",
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
"default": "",
"order": 8
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@
],
"default": "After loading Data in the destination",
"order": 7
},
"heartbeat_action_query": {
"type": "string",
"title": "Debezium heartbeat query (Advanced)",
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
"default": "",
"order": 8
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@
],
"default": "After loading Data in the destination",
"order": 7
},
"heartbeat_action_query": {
"type": "string",
"title": "Debezium heartbeat query (Advanced)",
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
"default": "",
"order": 8
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ protected JsonNode config() {
.withoutSsl()
.withCdcReplication("After loading Data in the destination")
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.with("heartbeat_action_query", "")
.build();
}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.7 | 2024-02-08 | [34781](https://github.com/airbytehq/airbyte/pull/34781) | Add a setting in the setup page to advance the LSN. |
| 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 |
| 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 |
| 3.3.4 | 2024-01-31 | [34723](https://github.com/airbytehq/airbyte/pull/34723) | Adopt CDK v0.16.3 |
Expand Down
10 changes: 10 additions & 0 deletions docs/integrations/sources/postgres/postgres-troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,14 @@ The connector waits for the default initial wait time of 5 minutes (300 seconds)

If you know there are database changes to be synced, but the connector cannot read those changes, the root cause may be insufficient waiting time. In that case, you can increase the waiting time (example: set to 600 seconds) to test if it is indeed the root cause. On the other hand, if you know there are no database changes, you can decrease the wait time to speed up the zero record syncs.

### (Advanced) WAL disk consumption and heartbeat action query

In certain situations, WAL disk consumption increases. This can occur when there are a large volume of changes, but only a small percentage of them are being made to the databases, schemas and tables configured for capture.

A workaround for this situation is to artificially add events to a heartbeat table that the Airbyte use has write access to. This will ensure that Airbyte can process the WAL and prevent disk space to spike. To configure this:
1. Create a table (e.g. `airbyte_heartbeat`) in the database and schema being tracked.
2. Add this table to the airbyte publication.
3. Configure the `heartbeat_action_query` property while setting up the source-postgres connector. This query will be periodically executed by Airbyte on the `airbyte_heartbeat` table. For example, this param can be set to a query like `INSERT INTO airbyte_heartbeat (text) VALUES ('heartbeat')`.


See detailed documentation [here](https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-wal-disk-space).

0 comments on commit e6fff38

Please sign in to comment.