Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-4257] Async replication UX Improvements #481

Merged
merged 12 commits into from
Jun 18, 2024
Merged
11 changes: 9 additions & 2 deletions actions.yaml
Original file line number Diff line number Diff line change
@@ -11,6 +11,13 @@ create-backup:
Differential backup is a copy only of changed data since the last full backup.
Incremental backup is a copy only of changed data since the last backup (any type).
Possible values - full, differential, incremental.
create-replication:
description: Set up asynchronous replication between two clusters.
params:
name:
type: string
description: The name of the replication (defaults to 'default').
default: default
get-primary:
description: Get the unit which is the primary/leader in the replication.
get-password:
@@ -25,10 +32,10 @@ list-backups:
description: Lists backups in s3 storage.
pre-upgrade-check:
description: Run necessary pre-upgrade checks and preparations before executing a charm refresh.
promote-cluster:
promote-to-primary:
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
params:
force-promotion:
force:
type: boolean
description: Force the promotion of a cluster when there is already a primary cluster.
restore:
8 changes: 4 additions & 4 deletions metadata.yaml
Original file line number Diff line number Diff line change
@@ -26,8 +26,8 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
replication-offer:
interface: postgresql_async
limit: 1
optional: true
database:
@@ -41,8 +41,8 @@ provides:
limit: 1

requires:
async-replica:
interface: async_replication
replication:
interface: postgresql_async
limit: 1
optional: true
certificates:
50 changes: 39 additions & 11 deletions src/charm.py
Original file line number Diff line number Diff line change
@@ -90,7 +90,11 @@
USER,
USER_PASSWORD_KEY,
)
from relations.async_replication import PostgreSQLAsyncReplication
from relations.async_replication import (
REPLICATION_CONSUMER_RELATION,
REPLICATION_OFFER_RELATION,
PostgreSQLAsyncReplication,
)
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model
@@ -1222,15 +1226,42 @@ def _on_set_password(self, event: ActionEvent) -> None:
)
return

# Update the password in the PostgreSQL instance.
try:
self.postgresql.update_user_password(username, password)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
replication_offer_relation = self.model.get_relation(REPLICATION_OFFER_RELATION)
if (
replication_offer_relation is not None
and not self.async_replication.is_primary_cluster()
):
# Update the password in the other cluster PostgreSQL primary instance.
other_cluster_endpoints = self.async_replication.get_all_primary_cluster_endpoints()
other_cluster_primary = self._patroni.get_primary(
alternative_endpoints=other_cluster_endpoints
)
other_cluster_primary_ip = [
replication_offer_relation.data[unit].get("private-address")
for unit in replication_offer_relation.units
if unit.name.replace("/", "-") == other_cluster_primary
][0]
try:
self.postgresql.update_user_password(
username, password, database_host=other_cluster_primary_ip
)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
event.fail("Failed changing the password.")
return
elif self.model.get_relation(REPLICATION_CONSUMER_RELATION) is not None:
event.fail(
"Failed changing the password: Not all members healthy or finished initial sync."
"Failed changing the password: This action can be ran only in the cluster from the offer side."
)
return
else:
# Update the password in this cluster PostgreSQL primary instance.
try:
self.postgresql.update_user_password(username, password)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
event.fail("Failed changing the password.")
return

# Update the password in the secret store.
self.set_secret(APP_SCOPE, f"{username}-password", password)
@@ -1239,9 +1270,6 @@ def _on_set_password(self, event: ActionEvent) -> None:
# Other units Patroni configuration will be reloaded in the peer relation changed event.
self.update_config()

# Update the password in the async replication data.
self.async_replication.update_async_replication_data()

event.set_results({"password": password})

def _on_update_status(self, _) -> None:
@@ -1357,7 +1385,7 @@ def _set_primary_status_message(self) -> None:
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
self.unit.status = ActiveStatus("Primary")
elif self.is_standby_leader:
self.unit.status = ActiveStatus("Standby Leader")
self.unit.status = ActiveStatus("Standby")
elif self._patroni.member_started:
self.unit.status = ActiveStatus()
except (RetryError, ConnectionError) as e:
13 changes: 10 additions & 3 deletions src/cluster.py
Original file line number Diff line number Diff line change
@@ -230,19 +230,20 @@ def get_member_status(self, member_name: str) -> str:
return member["state"]
return ""

def get_primary(self, unit_name_pattern=False) -> str:
def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] = None) -> str:
"""Get primary instance.

Args:
unit_name_pattern: whether to convert pod name to unit name
alternative_endpoints: list of alternative endpoints to check for the primary.

Returns:
primary pod or unit name.
"""
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
url = self._get_alternative_patroni_url(attempt, alternative_endpoints)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
@@ -301,12 +302,18 @@ def get_sync_standby_names(self) -> List[str]:
sync_standbys.append("/".join(member["name"].rsplit("-", 1)))
return sync_standbys

def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str:
def _get_alternative_patroni_url(
self, attempt: AttemptManager, alternative_endpoints: List[str] = None
) -> str:
"""Get an alternative REST API URL from another member each time.

When the Patroni process is not running in the current unit it's needed
to use a URL from another cluster member REST API to do some operations.
"""
if alternative_endpoints is not None:
return self._patroni_url.replace(
self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1]
)
attempt_number = attempt.retry_state.attempt_number
if attempt_number > 1:
url = self._patroni_url
346 changes: 233 additions & 113 deletions src/relations/async_replication.py

Large diffs are not rendered by default.

32 changes: 16 additions & 16 deletions tests/integration/ha_tests/test_async_replication.py
Original file line number Diff line number Diff line change
@@ -156,10 +156,10 @@ async def test_async_replication(
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test)

first_offer_command = f"offer {DATABASE_APP_NAME}:async-primary async-primary"
first_offer_command = f"offer {DATABASE_APP_NAME}:replication-offer replication-offer"
await ops_test.juju(*first_offer_command.split())
first_consume_command = (
f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-primary"
f"consume -m {second_model.info.name} admin/{first_model.info.name}.replication-offer"
)
await ops_test.juju(*first_consume_command.split())

@@ -173,7 +173,7 @@ async def test_async_replication(
),
)

await second_model.relate(DATABASE_APP_NAME, "async-primary")
await second_model.relate(DATABASE_APP_NAME, "replication-offer")

async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
await gather(
@@ -193,7 +193,7 @@ async def test_async_replication(
leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME)
assert leader_unit is not None, "No leader unit found"
logger.info("promoting the first cluster")
run_action = await leader_unit.run_action("promote-cluster")
run_action = await leader_unit.run_action("create-replication")
await run_action.wait()
assert (run_action.results.get("return-code", None) == 0) or (
run_action.results.get("Code", None) == "0"
@@ -228,10 +228,10 @@ async def test_switchover(
second_model_continuous_writes,
):
"""Test switching over to the second cluster."""
second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica"
second_offer_command = f"offer {DATABASE_APP_NAME}:replication replication"
await ops_test.juju(*second_offer_command.split())
second_consume_command = (
f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-replica"
f"consume -m {second_model.info.name} admin/{first_model.info.name}.replication"
)
await ops_test.juju(*second_consume_command.split())

@@ -250,7 +250,7 @@ async def test_switchover(
leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model)
assert leader_unit is not None, "No leader unit found"
logger.info("promoting the second cluster")
run_action = await leader_unit.run_action("promote-cluster", **{"force-promotion": True})
run_action = await leader_unit.run_action("promote-to-primary", **{"force": True})
await run_action.wait()
assert (run_action.results.get("return-code", None) == 0) or (
run_action.results.get("Code", None) == "0"
@@ -288,16 +288,16 @@ async def test_promote_standby(
"database", f"{APPLICATION_NAME}:first-database"
)
await second_model.applications[DATABASE_APP_NAME].remove_relation(
"async-replica", "async-primary"
"replication", "replication-offer"
)
wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model)
wait_for_relation_removed_between(ops_test, "replication-offer", "replication", second_model)
async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
await gather(
first_model.wait_for_idle(
apps=[DATABASE_APP_NAME],
status="blocked",
idle_period=IDLE_PERIOD,
timeout=TIMEOUT,
apps=[DATABASE_APP_NAME], idle_period=IDLE_PERIOD, timeout=TIMEOUT
),
first_model.block_until(
lambda: first_model.applications[DATABASE_APP_NAME].status == "blocked",
),
second_model.wait_for_idle(
apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT
@@ -308,7 +308,7 @@ async def test_promote_standby(
leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME)
assert leader_unit is not None, "No leader unit found"
logger.info("promoting the first cluster")
run_action = await leader_unit.run_action("promote-cluster")
run_action = await leader_unit.run_action("promote-to-primary")
await run_action.wait()
assert (run_action.results.get("return-code", None) == 0) or (
run_action.results.get("Code", None) == "0"
@@ -365,7 +365,7 @@ async def test_reestablish_relation(
await are_writes_increasing(ops_test)

logger.info("reestablishing the relation")
await second_model.relate(DATABASE_APP_NAME, "async-primary")
await second_model.relate(DATABASE_APP_NAME, "replication-offer")
async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
await gather(
first_model.wait_for_idle(
@@ -384,7 +384,7 @@ async def test_reestablish_relation(
leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME)
assert leader_unit is not None, "No leader unit found"
logger.info("promoting the first cluster")
run_action = await leader_unit.run_action("promote-cluster")
run_action = await leader_unit.run_action("create-replication")
await run_action.wait()
assert (run_action.results.get("return-code", None) == 0) or (
run_action.results.get("Code", None) == "0"
6 changes: 3 additions & 3 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
@@ -1036,10 +1036,10 @@ async def wait_for_idle_on_blocked(
unit = ops_test.model.units.get(f"{database_app_name}/{unit_number}")
await asyncio.gather(
ops_test.model.wait_for_idle(apps=[other_app_name], status="active"),
ops_test.model.wait_for_idle(
apps=[database_app_name], status="blocked", raise_on_blocked=False
ops_test.model.block_until(
lambda: unit.workload_status == "blocked"
and unit.workload_status_message == status_message
),
ops_test.model.block_until(lambda: unit.workload_status_message == status_message),
)


11 changes: 8 additions & 3 deletions tests/integration/test_config.py
Original file line number Diff line number Diff line change
@@ -96,11 +96,16 @@ async def test_config_parameters(ops_test: OpsTest) -> None:
logger.info(k)
charm_config[k] = v[0]
await ops_test.model.applications[DATABASE_APP_NAME].set_config(charm_config)
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME], status="blocked", timeout=100
await ops_test.model.block_until(
lambda: ops_test.model.units[f"{DATABASE_APP_NAME}/0"].workload_status
== "blocked",
timeout=100,
)
assert "Configuration Error" in leader_unit.workload_status_message
charm_config[k] = v[1]

await ops_test.model.applications[DATABASE_APP_NAME].set_config(charm_config)
await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=100)
await ops_test.model.block_until(
lambda: ops_test.model.units[f"{DATABASE_APP_NAME}/0"].workload_status == "active",
timeout=100,
)
4 changes: 1 addition & 3 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
@@ -2299,9 +2299,7 @@ def test_set_active_status(self, _get_primary, _is_standby_leader, _member_start
self.charm.unit.status.message,
"Primary"
if values[0] == self.charm.unit.name
else (
"Standby Leader" if values[1] else ("" if values[2] else "fake status")
),
else ("Standby" if values[1] else ("" if values[2] else "fake status")),
)
else:
_get_primary.side_effect = values[0]