Skip to content

Commit

Permalink
[DPE-4427] Address main instability sources on backups integration te…
Browse files Browse the repository at this point in the history
…sts (#496)

* add postgres connection check

* add leader check in peer event + remove restart

* remove unit test

* refactor initialization check

* catch RetryError and defer tls event

* add primary switchover on scale-down

* revert patroni restart change

* add sleep to tls check and catch modelError

* wait before first backup + better logging on TLS test

* add wait for model in TLS test + idle timeout

* catch retry error from patroni call + increase retries

* revert retry attempt limit change

* revert wait model on tls test

* update test_charm parameter

* refactor on initialize cluster check + make self_healing fail fast

* add shared buffers to dynamic config

* revert + make wait for TLS in test_backups

* clean previous changes

* try waiting for idle after relating

* reposition wait to avoid charm stuck

* bump LIBPATCH and set log to warning level
  • Loading branch information
lucasgameiroborges authored Jun 11, 2024
1 parent 04c217e commit c996dd3
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 13 deletions.
5 changes: 3 additions & 2 deletions lib/charms/postgresql_k8s/v0/postgresql_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from ops.charm import ActionEvent, RelationBrokenEvent
from ops.framework import Object
from ops.pebble import ConnectionError, PathError, ProtocolError
from tenacity import RetryError

# The unique Charmhub library identifier, never change it
LIBID = "c27af44a92df4ef38d7ae06418b2800f"
Expand All @@ -43,7 +44,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version.
LIBPATCH = 8
LIBPATCH = 9

logger = logging.getLogger(__name__)
SCOPE = "unit"
Expand Down Expand Up @@ -142,7 +143,7 @@ def _on_certificate_available(self, event: CertificateAvailableEvent) -> None:
logger.debug("Cannot push TLS certificates at this moment")
event.defer()
return
except (ConnectionError, PathError, ProtocolError) as e:
except (ConnectionError, PathError, ProtocolError, RetryError) as e:
logger.error("Cannot push TLS certificates: %r", e)
event.defer()
return
Expand Down
95 changes: 88 additions & 7 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@
Container,
JujuVersion,
MaintenanceStatus,
ModelError,
Relation,
Unit,
UnknownStatus,
WaitingStatus,
)
from ops.pebble import ChangeError, Layer, PathError, ProtocolError, ServiceStatus
from requests import ConnectionError
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed
from tenacity import RetryError, Retrying, stop_after_attempt, stop_after_delay, wait_fixed

from backups import PostgreSQLBackups
from config import CharmConfig
Expand Down Expand Up @@ -80,7 +81,7 @@
WORKLOAD_OS_GROUP,
WORKLOAD_OS_USER,
)
from patroni import NotReadyError, Patroni
from patroni import NotReadyError, Patroni, SwitchoverFailedError
from relations.async_replication import PostgreSQLAsyncReplication
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
Expand Down Expand Up @@ -144,6 +145,7 @@ def __init__(self, *args):
self.framework.observe(self.on.secret_changed, self._on_peer_relation_changed)
self.framework.observe(self.on[PEER].relation_departed, self._on_peer_relation_departed)
self.framework.observe(self.on.postgresql_pebble_ready, self._on_postgresql_pebble_ready)
self.framework.observe(self.on.pgdata_storage_detaching, self._on_pgdata_storage_detaching)
self.framework.observe(self.on.stop, self._on_stop)
self.framework.observe(self.on.upgrade_charm, self._on_upgrade_charm)
self.framework.observe(self.on.get_password_action, self._on_get_password)
Expand Down Expand Up @@ -379,14 +381,68 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _on_peer_relation_changed(self, event: HookEvent) -> None:
def _on_pgdata_storage_detaching(self, _) -> None:
# Change the primary if it's the unit that is being removed.
try:
primary = self._patroni.get_primary(unit_name_pattern=True)
except RetryError:
# Ignore the event if the primary couldn't be retrieved.
# If a switchover is needed, an automatic failover will be triggered
# when the unit is removed.
logger.debug("Early exit on_pgdata_storage_detaching: primary cannot be retrieved")
return

if self.unit.name != primary:
return

if not self._patroni.are_all_members_ready():
logger.warning(
"could not switchover because not all members are ready"
" - an automatic failover will be triggered"
)
return

# Try to switchover to another member and raise an exception if it doesn't succeed.
# If it doesn't happen on time, Patroni will automatically run a fail-over.
try:
# Get the current primary to check if it has changed later.
current_primary = self._patroni.get_primary()

# Trigger the switchover.
self._patroni.switchover()

# Wait for the switchover to complete.
self._patroni.primary_changed(current_primary)

logger.info("successful switchover")
except (RetryError, SwitchoverFailedError) as e:
logger.warning(
f"switchover failed with reason: {e} - an automatic failover will be triggered"
)
return

# Only update the connection endpoints if there is a primary.
# A cluster can have all members as replicas for some time after
# a failed switchover, so wait until the primary is elected.
endpoints_to_remove = self._get_endpoints_to_remove()
self.postgresql_client_relation.update_read_only_endpoint()
self._remove_from_endpoints(endpoints_to_remove)

def _on_peer_relation_changed(self, event: HookEvent) -> None: # noqa: C901
"""Reconfigure cluster members."""
# The cluster must be initialized first in the leader unit
# before any other member joins the cluster.
if "cluster_initialised" not in self._peers.data[self.app]:
logger.debug(
"Deferring on_peer_relation_changed: Cluster must be initialized before members can join"
)
if self.unit.is_leader():
if self._initialize_cluster(event):
logger.debug("Deferring on_peer_relation_changed: Leader initialized cluster")
else:
logger.debug("_initialized_cluster failed on _peer_relation_changed")
return
else:
logger.debug(
"Deferring on_peer_relation_changed: Cluster must be initialized before members can join"
)
event.defer()
return

Expand Down Expand Up @@ -437,7 +493,10 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None:
event.defer()
return

self.postgresql_client_relation.update_read_only_endpoint()
try:
self.postgresql_client_relation.update_read_only_endpoint()
except ModelError as e:
logger.warning("Cannot update read_only endpoints: %s", str(e))

self.backup.coordinate_stanza_fields()

Expand Down Expand Up @@ -594,6 +653,9 @@ def _add_members(self, event) -> None:
except NotReadyError:
logger.info("Deferring reconfigure: another member doing sync right now")
event.defer()
except RetryError:
logger.info("Deferring reconfigure: failed to obtain cluster members from Patroni")
event.defer()

def add_cluster_member(self, member: str) -> None:
"""Add member to the cluster if all members are already up and running.
Expand Down Expand Up @@ -1432,6 +1494,14 @@ def _restart(self, event: RunWithLock) -> None:
# Update health check URL.
self._update_pebble_layers()

try:
for attempt in Retrying(wait=wait_fixed(3), stop=stop_after_delay(300)):
with attempt:
if not self._can_connect_to_postgresql:
assert False
except Exception:
logger.exception("Unable to reconnect to postgresql")

# Start or stop the pgBackRest TLS server service when TLS certificate change.
self.backup.start_stop_pgbackrest_service()

Expand All @@ -1448,6 +1518,17 @@ def _is_workload_running(self) -> bool:

return services[0].current == ServiceStatus.ACTIVE

@property
def _can_connect_to_postgresql(self) -> bool:
try:
for attempt in Retrying(stop=stop_after_delay(30), wait=wait_fixed(3)):
with attempt:
assert self.postgresql.get_postgresql_timezones()
except RetryError:
logger.debug("Cannot connect to database")
return False
return True

def update_config(self, is_creating_backup: bool = False) -> bool:
"""Updates Patroni config file based on the existence of the TLS files."""
# Retrieve PostgreSQL parameters.
Expand Down
11 changes: 11 additions & 0 deletions src/patroni.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
RetryError,
Retrying,
retry,
retry_if_result,
stop_after_attempt,
stop_after_delay,
wait_exponential,
Expand Down Expand Up @@ -490,3 +491,13 @@ def switchover(self, candidate: str = None) -> None:
new_primary = self.get_primary()
if (candidate is not None and new_primary != candidate) or new_primary == primary:
raise SwitchoverFailedError("primary was not switched correctly")

@retry(
retry=retry_if_result(lambda x: not x),
stop=stop_after_attempt(10),
wait=wait_exponential(multiplier=1, min=2, max=30),
)
def primary_changed(self, old_primary: str) -> bool:
"""Checks whether the primary unit has changed."""
primary = self.get_primary()
return primary != old_primary
11 changes: 9 additions & 2 deletions tests/integration/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,13 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
await build_and_deploy(
ops_test, 2, database_app_name=database_app_name, wait_for_idle=False
)
await ops_test.model.relate(database_app_name, S3_INTEGRATOR_APP_NAME)

await ops_test.model.relate(database_app_name, TLS_CERTIFICATES_APP_NAME)
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[database_app_name], status="active", timeout=1000
)
await ops_test.model.relate(database_app_name, S3_INTEGRATOR_APP_NAME)

# Configure and set access and secret keys.
logger.info(f"configuring S3 integrator for {cloud}")
Expand Down Expand Up @@ -142,7 +147,9 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
)
connection.close()

# Run the "create backup" action.
# With a stable cluster, Run the "create backup" action
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000, idle_period=30)
logger.info("creating a backup")
action = await ops_test.model.units.get(replica).run_action("create-backup")
await action.wait()
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async def test_postgresql_parameters_change(ops_test: OpsTest) -> None:
"""Test that's possible to change PostgreSQL parameters."""
await ops_test.model.applications[APP_NAME].set_config({
"memory_max_prepared_transactions": "100",
"memory_shared_buffers": "128",
"memory_shared_buffers": "32768", # 2 * 128MB. Patroni may refuse the config if < 128MB
"response_lc_monetary": "en_GB.utf8",
"experimental_max_connections": "200",
})
Expand Down Expand Up @@ -206,7 +206,7 @@ async def test_postgresql_parameters_change(ops_test: OpsTest) -> None:

# Validate each configuration set by Patroni on PostgreSQL.
assert settings["max_prepared_transactions"] == "100"
assert settings["shared_buffers"] == "128"
assert settings["shared_buffers"] == "32768"
assert settings["lc_monetary"] == "en_GB.utf8"
assert settings["max_connections"] == "200"
finally:
Expand Down

0 comments on commit c996dd3

Please sign in to comment.