Skip to content

Commit

Permalink
Fix failover and set-secret behaviour
Browse files Browse the repository at this point in the history
Signed-off-by: Marcelo Henrique Neppel <[email protected]>
  • Loading branch information
marceloneppel committed May 29, 2024
1 parent 3209fc1 commit 7ac4f3f
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 73 deletions.
2 changes: 1 addition & 1 deletion actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pre-upgrade-check:
promote-to-primary:
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
params:
force-promotion:
force:
type: boolean
description: Force the promotion of a cluster when there is already a primary cluster.
restore:
Expand Down
26 changes: 15 additions & 11 deletions lib/charms/postgresql_k8s/v0/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 26
LIBPATCH = 27

INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles"

Expand Down Expand Up @@ -111,20 +111,19 @@ def __init__(
self.system_users = system_users

def _connect_to_database(
self, database: str = None, connect_to_current_host: bool = False
self, database: str = None, database_host: str = None
) -> psycopg2.extensions.connection:
"""Creates a connection to the database.
Args:
database: database to connect to (defaults to the database
provided when the object for this class was created).
connect_to_current_host: whether to connect to the current host
instead of the primary host.
database_host: host to connect to instead of the primary host.
Returns:
psycopg2 connection object.
"""
host = self.current_host if connect_to_current_host else self.primary_host
host = database_host if database_host is not None else self.primary_host
connection = psycopg2.connect(
f"dbname='{database if database else self.database}' user='{self.user}' host='{host}'"
f"password='{self.password}' connect_timeout=1"
Expand Down Expand Up @@ -388,7 +387,7 @@ def get_postgresql_text_search_configs(self) -> Set[str]:
Set of PostgreSQL text search configs.
"""
with self._connect_to_database(
connect_to_current_host=True
database_host=self.current_host
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT CONCAT('pg_catalog.', cfgname) FROM pg_ts_config;")
text_search_configs = cursor.fetchall()
Expand All @@ -401,7 +400,7 @@ def get_postgresql_timezones(self) -> Set[str]:
Set of PostgreSQL timezones.
"""
with self._connect_to_database(
connect_to_current_host=True
database_host=self.current_host
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT name FROM pg_timezone_names;")
timezones = cursor.fetchall()
Expand Down Expand Up @@ -434,7 +433,7 @@ def is_tls_enabled(self, check_current_host: bool = False) -> bool:
"""
try:
with self._connect_to_database(
connect_to_current_host=check_current_host
database_host=self.current_host if check_current_host else None
) as connection, connection.cursor() as cursor:
cursor.execute("SHOW ssl;")
return "on" in cursor.fetchone()[0]
Expand Down Expand Up @@ -502,19 +501,24 @@ def set_up_database(self) -> None:
if connection is not None:
connection.close()

def update_user_password(self, username: str, password: str) -> None:
def update_user_password(
self, username: str, password: str, database_host: str = None
) -> None:
"""Update a user password.
Args:
username: the user to update the password.
password: the new password for the user.
database_host: the host to connect to.
Raises:
PostgreSQLUpdateUserPasswordError if the password couldn't be changed.
"""
connection = None
try:
with self._connect_to_database() as connection, connection.cursor() as cursor:
with self._connect_to_database(
database_host=database_host
) as connection, connection.cursor() as cursor:
cursor.execute(
sql.SQL("ALTER USER {} WITH ENCRYPTED PASSWORD '" + password + "';").format(
sql.Identifier(username)
Expand Down Expand Up @@ -610,7 +614,7 @@ def validate_date_style(self, date_style: str) -> bool:
"""
try:
with self._connect_to_database(
connect_to_current_host=True
database_host=self.current_host
) as connection, connection.cursor() as cursor:
cursor.execute(
sql.SQL(
Expand Down
48 changes: 38 additions & 10 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@
USER,
USER_PASSWORD_KEY,
)
from relations.async_replication import PostgreSQLAsyncReplication
from relations.async_replication import (
REPLICATION_CONSUMER_RELATION,
REPLICATION_OFFER_RELATION,
PostgreSQLAsyncReplication,
)
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model
Expand Down Expand Up @@ -1194,15 +1198,42 @@ def _on_set_password(self, event: ActionEvent) -> None:
)
return

# Update the password in the PostgreSQL instance.
try:
self.postgresql.update_user_password(username, password)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
replication_offer_relation = self.model.get_relation(REPLICATION_OFFER_RELATION)
if (
replication_offer_relation is not None
and not self.async_replication.is_primary_cluster()
):
# Update the password in the other cluster PostgreSQL primary instance.
other_cluster_endpoints = self.async_replication.get_all_primary_cluster_endpoints()
other_cluster_primary = self._patroni.get_primary(
alternative_endpoints=other_cluster_endpoints
)
other_cluster_primary_ip = [
replication_offer_relation.data[unit].get("private-address")
for unit in replication_offer_relation.units
if unit.name.replace("/", "-") == other_cluster_primary
][0]
try:
self.postgresql.update_user_password(
username, password, database_host=other_cluster_primary_ip
)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
event.fail("Failed changing the password.")
return
elif self.model.get_relation(REPLICATION_CONSUMER_RELATION) is not None:
event.fail(
"Failed changing the password: Not all members healthy or finished initial sync."
"Failed changing the password: This action can be ran only in the cluster from the offer side."
)
return
else:
# Update the password in this cluster PostgreSQL primary instance.
try:
self.postgresql.update_user_password(username, password)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
event.fail("Failed changing the password.")
return

# Update the password in the secret store.
self.set_secret(APP_SCOPE, f"{username}-password", password)
Expand All @@ -1211,9 +1242,6 @@ def _on_set_password(self, event: ActionEvent) -> None:
# Other units Patroni configuration will be reloaded in the peer relation changed event.
self.update_config()

# Update the password in the async replication data.
self.async_replication.update_async_replication_data()

event.set_results({"password": password})

def _on_update_status(self, _) -> None:
Expand Down
13 changes: 10 additions & 3 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,19 +230,20 @@ def get_member_status(self, member_name: str) -> str:
return member["state"]
return ""

def get_primary(self, unit_name_pattern=False) -> str:
def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] = None) -> str:
"""Get primary instance.
Args:
unit_name_pattern: whether to convert pod name to unit name
alternative_endpoints: list of alternative endpoints to check for the primary.
Returns:
primary pod or unit name.
"""
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
url = self._get_alternative_patroni_url(attempt, alternative_endpoints)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
Expand Down Expand Up @@ -301,12 +302,18 @@ def get_sync_standby_names(self) -> List[str]:
sync_standbys.append("/".join(member["name"].rsplit("-", 1)))
return sync_standbys

def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str:
def _get_alternative_patroni_url(
self, attempt: AttemptManager, alternative_endpoints: List[str] = None
) -> str:
"""Get an alternative REST API URL from another member each time.
When the Patroni process is not running in the current unit it's needed
to use a URL from another cluster member REST API to do some operations.
"""
if alternative_endpoints is not None:
return self._patroni_url.replace(
self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1]
)
attempt_number = attempt.retry_state.attempt_number
if attempt_number > 1:
url = self._patroni_url
Expand Down
Loading

0 comments on commit 7ac4f3f

Please sign in to comment.