Skip to content

Commit

Permalink
[DPE-4256] Async replication UX improvements (#491)
Browse files Browse the repository at this point in the history
  • Loading branch information
marceloneppel authored Jun 13, 2024
1 parent c996dd3 commit a627e0b
Show file tree
Hide file tree
Showing 13 changed files with 353 additions and 189 deletions.
11 changes: 9 additions & 2 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ create-backup:
Differential backup is a copy only of changed data since the last full backup.
Incremental backup is a copy only of changed data since the last backup (any type).
Possible values - full, differential, incremental.
create-replication:
description: Set up asynchronous replication between two clusters.
params:
name:
type: string
description: The name of the replication (defaults to 'default').
default: default
get-primary:
description: Get the unit with is the primary/leader in the replication.
get-password:
Expand All @@ -25,10 +32,10 @@ list-backups:
description: Lists backups in s3 storage in AWS.
pre-upgrade-check:
description: Run necessary pre-upgrade checks and preparations before executing a charm refresh.
promote-cluster:
promote-to-primary:
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
params:
force-promotion:
force:
type: boolean
description: Force the promotion of a cluster when there is already a primary cluster.
restore:
Expand Down
26 changes: 15 additions & 11 deletions lib/charms/postgresql_k8s/v0/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 26
LIBPATCH = 27

INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles"

Expand Down Expand Up @@ -111,20 +111,19 @@ def __init__(
self.system_users = system_users

def _connect_to_database(
self, database: str = None, connect_to_current_host: bool = False
self, database: str = None, database_host: str = None
) -> psycopg2.extensions.connection:
"""Creates a connection to the database.
Args:
database: database to connect to (defaults to the database
provided when the object for this class was created).
connect_to_current_host: whether to connect to the current host
instead of the primary host.
database_host: host to connect to instead of the primary host.
Returns:
psycopg2 connection object.
"""
host = self.current_host if connect_to_current_host else self.primary_host
host = database_host if database_host is not None else self.primary_host
connection = psycopg2.connect(
f"dbname='{database if database else self.database}' user='{self.user}' host='{host}'"
f"password='{self.password}' connect_timeout=1"
Expand Down Expand Up @@ -388,7 +387,7 @@ def get_postgresql_text_search_configs(self) -> Set[str]:
Set of PostgreSQL text search configs.
"""
with self._connect_to_database(
connect_to_current_host=True
database_host=self.current_host
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT CONCAT('pg_catalog.', cfgname) FROM pg_ts_config;")
text_search_configs = cursor.fetchall()
Expand All @@ -401,7 +400,7 @@ def get_postgresql_timezones(self) -> Set[str]:
Set of PostgreSQL timezones.
"""
with self._connect_to_database(
connect_to_current_host=True
database_host=self.current_host
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT name FROM pg_timezone_names;")
timezones = cursor.fetchall()
Expand Down Expand Up @@ -434,7 +433,7 @@ def is_tls_enabled(self, check_current_host: bool = False) -> bool:
"""
try:
with self._connect_to_database(
connect_to_current_host=check_current_host
database_host=self.current_host if check_current_host else None
) as connection, connection.cursor() as cursor:
cursor.execute("SHOW ssl;")
return "on" in cursor.fetchone()[0]
Expand Down Expand Up @@ -502,19 +501,24 @@ def set_up_database(self) -> None:
if connection is not None:
connection.close()

def update_user_password(self, username: str, password: str) -> None:
def update_user_password(
self, username: str, password: str, database_host: str = None
) -> None:
"""Update a user password.
Args:
username: the user to update the password.
password: the new password for the user.
database_host: the host to connect to.
Raises:
PostgreSQLUpdateUserPasswordError if the password couldn't be changed.
"""
connection = None
try:
with self._connect_to_database() as connection, connection.cursor() as cursor:
with self._connect_to_database(
database_host=database_host
) as connection, connection.cursor() as cursor:
cursor.execute(
sql.SQL("ALTER USER {} WITH ENCRYPTED PASSWORD '" + password + "';").format(
sql.Identifier(username)
Expand Down Expand Up @@ -610,7 +614,7 @@ def validate_date_style(self, date_style: str) -> bool:
"""
try:
with self._connect_to_database(
connect_to_current_host=True
database_host=self.current_host
) as connection, connection.cursor() as cursor:
cursor.execute(
sql.SQL(
Expand Down
8 changes: 4 additions & 4 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
replication-offer:
interface: postgresql_async
limit: 1
optional: true
database:
Expand All @@ -55,8 +55,8 @@ provides:
interface: grafana_dashboard

requires:
async-replica:
interface: async_replication
replication:
interface: postgresql_async
limit: 1
optional: true
certificates:
Expand Down
6 changes: 3 additions & 3 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed

from constants import BACKUP_TYPE_OVERRIDES, BACKUP_USER, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER
from relations.async_replication import ASYNC_PRIMARY_RELATION, ASYNC_REPLICA_RELATION
from relations.async_replication import REPLICATION_CONSUMER_RELATION, REPLICATION_OFFER_RELATION

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -810,8 +810,8 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool:

logger.info("Checking that the cluster is not replicating data to a standby cluster")
for relation in [
self.model.get_relation(ASYNC_REPLICA_RELATION),
self.model.get_relation(ASYNC_PRIMARY_RELATION),
self.model.get_relation(REPLICATION_CONSUMER_RELATION),
self.model.get_relation(REPLICATION_OFFER_RELATION),
]:
if not relation:
continue
Expand Down
50 changes: 39 additions & 11 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@
WORKLOAD_OS_USER,
)
from patroni import NotReadyError, Patroni, SwitchoverFailedError
from relations.async_replication import PostgreSQLAsyncReplication
from relations.async_replication import (
REPLICATION_CONSUMER_RELATION,
REPLICATION_OFFER_RELATION,
PostgreSQLAsyncReplication,
)
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model
Expand Down Expand Up @@ -844,7 +848,7 @@ def _set_active_status(self):
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
self.unit.status = ActiveStatus("Primary")
elif self.is_standby_leader:
self.unit.status = ActiveStatus("Standby Leader")
self.unit.status = ActiveStatus("Standby")
elif self._patroni.member_started:
self.unit.status = ActiveStatus()
except (RetryError, ConnectionError) as e:
Expand Down Expand Up @@ -1072,15 +1076,42 @@ def _on_set_password(self, event: ActionEvent) -> None:
)
return

# Update the password in the PostgreSQL instance.
try:
self.postgresql.update_user_password(username, password)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
replication_offer_relation = self.model.get_relation(REPLICATION_OFFER_RELATION)
if (
replication_offer_relation is not None
and not self.async_replication.is_primary_cluster()
):
# Update the password in the other cluster PostgreSQL primary instance.
other_cluster_endpoints = self.async_replication.get_all_primary_cluster_endpoints()
other_cluster_primary = self._patroni.get_primary(
alternative_endpoints=other_cluster_endpoints
)
other_cluster_primary_ip = [
replication_offer_relation.data[unit].get("private-address")
for unit in replication_offer_relation.units
if unit.name.replace("/", "-") == other_cluster_primary
][0]
try:
self.postgresql.update_user_password(
username, password, database_host=other_cluster_primary_ip
)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
event.fail("Failed changing the password.")
return
elif self.model.get_relation(REPLICATION_CONSUMER_RELATION) is not None:
event.fail(
"Failed changing the password: Not all members healthy or finished initial sync."
"Failed changing the password: This action can be ran only in the cluster from the offer side."
)
return
else:
# Update the password in this cluster PostgreSQL primary instance.
try:
self.postgresql.update_user_password(username, password)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
event.fail("Failed changing the password.")
return

# Update the password in the secret store.
self.set_secret(APP_SCOPE, f"{username}-password", password)
Expand All @@ -1089,9 +1120,6 @@ def _on_set_password(self, event: ActionEvent) -> None:
# Other units Patroni configuration will be reloaded in the peer relation changed event.
self.update_config()

# Update the password in the async replication data.
self.async_replication.update_async_replication_data()

event.set_results({"password": password})

def _on_get_primary(self, event: ActionEvent) -> None:
Expand Down
29 changes: 11 additions & 18 deletions src/patroni.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,6 @@ def _patroni_url(self) -> str:
"""Patroni REST API URL."""
return f"{'https' if self._tls_enabled else 'http'}://{self._endpoint}:8008"

# def configure_standby_cluster(self, host: str) -> None:
# """Configure this cluster as a standby cluster."""
# requests.patch(
# f"{self._patroni_url}/config",
# verify=self._verify,
# json={
# "standby_cluster": {
# "create_replica_methods": ["basebackup"],
# "host": host,
# "port": 5432,
# }
# },
# )

@property
def rock_postgresql_version(self) -> Optional[str]:
"""Version of Postgresql installed in the Rock image."""
Expand All @@ -112,12 +98,18 @@ def rock_postgresql_version(self) -> Optional[str]:
snap_meta = container.pull("/meta.charmed-postgresql/snap.yaml")
return yaml.safe_load(snap_meta)["version"]

def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str:
def _get_alternative_patroni_url(
self, attempt: AttemptManager, alternative_endpoints: List[str] = None
) -> str:
"""Get an alternative REST API URL from another member each time.
When the Patroni process is not running in the current unit it's needed
to use a URL from another cluster member REST API to do some operations.
"""
if alternative_endpoints is not None:
return self._patroni_url.replace(
self._endpoint, alternative_endpoints[attempt.retry_state.attempt_number - 1]
)
if attempt.retry_state.attempt_number > 1:
url = self._patroni_url.replace(
self._endpoint, list(self._endpoints)[attempt.retry_state.attempt_number - 2]
Expand All @@ -126,11 +118,12 @@ def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str:
url = self._patroni_url
return url

def get_primary(self, unit_name_pattern=False) -> str:
def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] = None) -> str:
"""Get primary instance.
Args:
unit_name_pattern: whether or not to convert pod name to unit name
alternative_endpoints: list of alternative endpoints to check for the primary.
Returns:
primary pod or unit name.
Expand All @@ -139,8 +132,8 @@ def get_primary(self, unit_name_pattern=False) -> str:
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(len(self._endpoints) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
r = requests.get(f"{url}/cluster", verify=self._verify)
url = self._get_alternative_patroni_url(attempt, alternative_endpoints)
r = requests.get(f"{url}/cluster", verify=self._verify, timeout=5)
for member in r.json()["members"]:
if member["role"] == "leader":
primary = member["name"]
Expand Down
Loading

0 comments on commit a627e0b

Please sign in to comment.