Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Custom replication slot name #543

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,11 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, Any]]:
# even though we still want logs with an LSN == start_lsn.
logical_replication_cursor.send_feedback(flush_lsn=start_lsn)

# get the slot name from the configuration or use the default value
replication_slot_name = self.config.get("replication_slot_name", "tappostgres")

logical_replication_cursor.start_replication(
slot_name="tappostgres",
slot_name=replication_slot_name, # use slot name
decode=True,
start_lsn=start_lsn,
status_interval=status_interval,
Expand Down Expand Up @@ -460,9 +463,11 @@ def logical_replication_connection(self):
Uses a direct psycopg2 implementation rather than through sqlalchemy.
"""
connection_string = (
f"dbname={self.config['database']} user={self.config['user']} password="
f"{self.config['password']} host={self.config['host']} port="
f"{self.config['port']}"
f"dbname={self.config['database']} "
f"user={self.config['user']} "
f"password={self.config['password']} "
f"host={self.config['host']} "
f"port={self.config['port']}"
)
return psycopg2.connect(
connection_string,
Expand Down
32 changes: 32 additions & 0 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,21 @@ def __init__(
)

config_jsonschema = th.PropertiesList(
th.Property(
"replication_slot_name",
th.StringType(
pattern="^(?!pg_)[A-Za-z0-9_]{1,63}$",
),
default="tappostgres",
description=(
"Name of the replication slot to use for logical replication. "
"Must be unique for parallel extractions. "
"Only applicable when replication_method is LOG_BASED."
"- Contain only letters, numbers, and underscores. "
"- Be less than or equal to 63 characters. "
"- Not start with 'pg_'."
),
),
th.Property(
"host",
th.StringType,
Expand Down Expand Up @@ -617,3 +632,20 @@ def discover_streams(self) -> Sequence[Stream]:
PostgresStream(self, catalog_entry, connector=self.connector)
)
return streams


# Configuration Example for Parallel Replication

config_1 = {
"host": "database1.example.com",
"port": 5432,
"dbname": "example_db_1",
"replication_slot_name": "slot_1",
}

config_2 = {
"host": "database2.example.com",
"port": 5432,
"dbname": "example_db_2",
"replication_slot_name": "slot_2",
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove these examples or perhaps move them to the readme?

51 changes: 51 additions & 0 deletions tests/test_slot_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import unittest

from tap_postgres.tap import TapPostgres


class TestReplicationSlot(unittest.TestCase):
def setUp(self):
self.default_config = {
"host": "localhost",
"port": 5432,
"dbname": "test_db",
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
}

def test_default_slot_name(self):
# Test backward compatibility when slot name is not provided.
config = self.default_config
tap = TapPostgres(config=config)
self.assertEqual(
tap.config.get("replication_slot_name", "tappostgres"), "tappostgres"
)

def test_custom_slot_name(self):
# Test if the custom slot name is used.
config = {**self.default_config, "replication_slot_name": "custom_slot"}
tap = TapPostgres(config=config)
self.assertEqual(tap.config["replication_slot_name"], "custom_slot")

def test_multiple_slots(self):
# Simulate using multiple configurations with different slot names.
config_1 = {**self.default_config, "replication_slot_name": "slot_1"}
config_2 = {**self.default_config, "replication_slot_name": "slot_2"}

tap_1 = TapPostgres(config=config_1)
tap_2 = TapPostgres(config=config_2)

self.assertNotEqual(
tap_1.config["replication_slot_name"],
tap_2.config["replication_slot_name"],
)
self.assertEqual(tap_1.config["replication_slot_name"], "slot_1")
self.assertEqual(tap_2.config["replication_slot_name"], "slot_2")

def test_invalid_slot_name(self):
# Test validation for invalid slot names (if any validation rules exist).
invalid_config = {
**self.default_config,
"replication_slot_name": "invalid slot name!",
}

with self.assertRaises(ValueError):
TapPostgres(config=invalid_config)
Loading