Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Custom replication slot name #543

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f0abd1e
add support for configurable replication_slot_name in logical replica…
RobyBen Nov 23, 2024
b9de248
fixes
RobyBen Nov 23, 2024
d2c6ed5
another fixes
RobyBen Nov 23, 2024
970ec8d
idk why dont work
RobyBen Nov 23, 2024
c7e7236
finaly&
RobyBen Nov 23, 2024
f811255
ruff format
RobyBen Nov 23, 2024
802bda8
little changes client.py and tap.py
RobyBen Nov 24, 2024
e81b989
ruff format again
RobyBen Nov 24, 2024
098b239
changed th.Property
RobyBen Nov 24, 2024
a8e2720
I forgot to delete the asserts
RobyBen Nov 24, 2024
d2b38aa
Move `pattern` parameter to `StringType` instance
edgarrmondragon Nov 25, 2024
eb7bea3
Update tap_postgres/tap.py
edgarrmondragon Nov 25, 2024
1aceb57
Merge branch 'main' into feature/slot_name
edgarrmondragon Nov 25, 2024
4f9deb3
Use keyword argument in tests
edgarrmondragon Nov 25, 2024
80071d2
Add `user`
edgarrmondragon Nov 25, 2024
cc8960f
Add `password`
edgarrmondragon Nov 25, 2024
c61234f
Update tests/test_slot_name.py
edgarrmondragon Nov 25, 2024
14f1854
changed default_config in the test, maybe it will help
RobyBen Nov 30, 2024
e45cd2a
Changed default_config in the test, maybe it will help. Please check …
RobyBen Nov 30, 2024
40a06fd
Merge branch 'main' into feature/slot_name
RobyBen Nov 30, 2024
6f21aa3
Merge branch 'feature/slot_name' of https://github.com/RobyBen/tap-po…
RobyBen Nov 30, 2024
73b7e8b
Ok, the default_config in the test should have changed , maybe that w…
RobyBen Nov 30, 2024
e5a3037
Merge branch 'main' into feature/slot_name
edgarrmondragon Dec 2, 2024
0927913
Changed test_slot_name
RobyBen Dec 2, 2024
2e11ad4
Merge branch 'feature/slot_name' of https://github.com/RobyBen/tap-po…
RobyBen Dec 2, 2024
f6bd4bb
Use pytest-style tests
edgarrmondragon Dec 2, 2024
d567ba4
Use hypothesis
edgarrmondragon Dec 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
395 changes: 233 additions & 162 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ extras = ["faker"]

[tool.poetry.group.dev.dependencies]
faker = ">=18.5.1"
hypothesis = ">=6.122.1"
mypy = ">=1.8.0"
pre-commit = ">=3.0.4"
ruff = "~=0.8.0"
Expand Down Expand Up @@ -102,7 +103,7 @@ select = [
"ICN", # flake8-import-conventions
"RET", # flake8-return
"SIM", # flake8-simplify
"TCH", # flake8-type-checking
"TC", # flake8-type-checking
"ERA", # eradicate
"PGH", # pygrep-hooks
"PL", # Pylint
Expand Down
15 changes: 10 additions & 5 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class PostgresLogBasedStream(SQLStream):

connector_class = PostgresConnector

# JSONB Objects won't be selected without type_confomance_level to ROOT_ONLY
# JSONB Objects won't be selected without type_conformance_level to ROOT_ONLY
TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY

replication_key = "_sdc_lsn"
Expand Down Expand Up @@ -339,8 +339,11 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]:
# even though we still want logs with an LSN == start_lsn.
logical_replication_cursor.send_feedback(flush_lsn=start_lsn)

# get the slot name from the configuration or use the default value
replication_slot_name = self.config.get("replication_slot_name", "tappostgres")

logical_replication_cursor.start_replication(
slot_name="tappostgres",
slot_name=replication_slot_name, # use slot name
decode=True,
start_lsn=start_lsn,
status_interval=status_interval,
Expand Down Expand Up @@ -459,9 +462,11 @@ def logical_replication_connection(self):
Uses a direct psycopg2 implementation rather than through sqlalchemy.
"""
connection_string = (
f"dbname={self.config['database']} user={self.config['user']} password="
f"{self.config['password']} host={self.config['host']} port="
f"{self.config['port']}"
f"dbname={self.config['database']} "
f"user={self.config['user']} "
f"password={self.config['password']} "
f"host={self.config['host']} "
f"port={self.config['port']}"
)
return psycopg2.connect(
connection_string,
Expand Down
16 changes: 16 additions & 0 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from collections.abc import Mapping, Sequence


REPLICATION_SLOT_PATTERN = "^(?!pg_)[A-Za-z0-9_]{1,63}$"


class TapPostgres(SQLTap):
"""Singer tap for Postgres."""

Expand Down Expand Up @@ -98,6 +101,19 @@ def __init__(
)

config_jsonschema = th.PropertiesList(
th.Property(
"replication_slot_name",
th.StringType(pattern=REPLICATION_SLOT_PATTERN),
default="tappostgres",
description=(
"Name of the replication slot to use for logical replication. "
"Must be unique for parallel extractions. "
"Only applicable when replication_method is LOG_BASED."
"- Contain only letters, numbers, and underscores. "
"- Be less than or equal to 63 characters. "
"- Not start with 'pg_'."
),
),
th.Property(
"host",
th.StringType,
Expand Down
60 changes: 60 additions & 0 deletions tests/test_slot_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pytest
from hypothesis import given
from hypothesis import strategies as st
from singer_sdk.exceptions import ConfigValidationError

from tap_postgres.tap import REPLICATION_SLOT_PATTERN, TapPostgres
from tests.settings import DB_SQLALCHEMY_URL


@pytest.fixture
def default_config():
return {"sqlalchemy_url": DB_SQLALCHEMY_URL}


def test_default_slot_name(default_config: dict):
"""Test backward compatibility when slot name is not provided."""
tap = TapPostgres(config=default_config, setup_mapper=False)
assert tap.config.get("replication_slot_name", "tappostgres") == "tappostgres"


@given(st.from_regex(REPLICATION_SLOT_PATTERN))
def test_custom_slot_name(s: str):
"""Test if the custom slot name is used."""
config = {
"sqlalchemy_url": DB_SQLALCHEMY_URL,
"replication_slot_name": s,
}
tap = TapPostgres(config=config, setup_mapper=False)
assert tap.config["replication_slot_name"] == s


def test_multiple_slots(default_config: dict):
"""Simulate using multiple configurations with different slot names."""
config_1 = {**default_config, "replication_slot_name": "slot_1"}
config_2 = {**default_config, "replication_slot_name": "slot_2"}

tap_1 = TapPostgres(config=config_1, setup_mapper=False)
tap_2 = TapPostgres(config=config_2, setup_mapper=False)

assert (
tap_1.config["replication_slot_name"] != tap_2.config["replication_slot_name"]
)
assert tap_1.config["replication_slot_name"] == "slot_1"
assert tap_2.config["replication_slot_name"] == "slot_2"


def test_invalid_slot_name(default_config: dict):
"""Test validation for invalid slot names."""
invalid_slot_name = "invalid slot name!"
invalid_config = {
**default_config,
"replication_slot_name": invalid_slot_name,
}

with pytest.raises(ConfigValidationError, match="does not match") as exc_info:
TapPostgres(config=invalid_config, setup_mapper=False)

errors = exc_info.value.errors
assert len(errors) == 1
assert invalid_slot_name in errors[0]
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ min_version = 4

[testenv]
deps =
hypothesis
pytest
commands =
pytest
Expand Down
Loading