Skip to content

Commit

Permalink
[DPE-2953] Cross-region async replication (#452)
Browse files Browse the repository at this point in the history
* Add async replication implementation

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Backup standby pgdata folder

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix OS call

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix unit tests

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Improve comments and logs

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Revert permission change

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Add optional type hint

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Add relation name to secret label and revert poetry.lock

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Reload Patroni configuration when member is not ready yet

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

---------

Signed-off-by: Marcelo Henrique Neppel <[email protected]>
  • Loading branch information
marceloneppel authored May 3, 2024
1 parent 35673fb commit 448a1a6
Show file tree
Hide file tree
Showing 9 changed files with 967 additions and 62 deletions.
6 changes: 6 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ list-backups:
description: Lists backups in s3 storage.
pre-upgrade-check:
description: Run necessary pre-upgrade checks and preparations before executing a charm refresh.
promote-cluster:
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
params:
force-promotion:
type: boolean
description: Force the promotion of a cluster when there is already a primary cluster.
restore:
description: Restore a database backup using pgBackRest.
S3 credentials are retrieved from a relation with the S3 integrator charm.
Expand Down
45 changes: 23 additions & 22 deletions lib/charms/postgresql_k8s/v0/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Any charm using this library should import the `psycopg2` or `psycopg2-binary` dependency.
"""

import logging
from collections import OrderedDict
from typing import Dict, List, Optional, Set, Tuple
Expand All @@ -35,7 +36,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 24
LIBPATCH = 26

INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles"

Expand Down Expand Up @@ -358,9 +359,7 @@ def _generate_database_privileges_statements(
statements.append(
"""UPDATE pg_catalog.pg_largeobject_metadata
SET lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}')
WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format(
user, self.user
)
WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format(user, self.user)
)
else:
for schema in schemas:
Expand Down Expand Up @@ -477,11 +476,11 @@ def set_up_database(self) -> None:
"""Set up postgres database with the right permissions."""
connection = None
try:
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
with self._connect_to_database() as connection, connection.cursor() as cursor:
cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';")
if cursor.fetchone() is not None:
return

# Allow access to the postgres database only to the system users.
cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;")
cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;")
Expand All @@ -491,6 +490,10 @@ def set_up_database(self) -> None:
sql.Identifier(user)
)
)
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;")
except psycopg2.Error as e:
logger.error(f"Failed to set up databases: {e}")
Expand Down Expand Up @@ -562,18 +565,16 @@ def build_postgresql_parameters(
parameters = {}
for config, value in config_options.items():
# Filter config option not related to PostgreSQL parameters.
if not config.startswith(
(
"durability",
"instance",
"logging",
"memory",
"optimizer",
"request",
"response",
"vacuum",
)
):
if not config.startswith((
"durability",
"instance",
"logging",
"memory",
"optimizer",
"request",
"response",
"vacuum",
)):
continue
parameter = "_".join(config.split("_")[1:])
if parameter in ["date_style", "time_zone"]:
Expand All @@ -594,8 +595,8 @@ def build_postgresql_parameters(
# and the remaining as cache memory.
shared_buffers = int(available_memory * 0.25)
effective_cache_size = int(available_memory - shared_buffers)
parameters.setdefault("shared_buffers", f"{int(shared_buffers/10**6)}MB")
parameters.update({"effective_cache_size": f"{int(effective_cache_size/10**6)}MB"})
parameters.setdefault("shared_buffers", f"{int(shared_buffers / 10**6)}MB")
parameters.update({"effective_cache_size": f"{int(effective_cache_size / 10**6)}MB"})
else:
# Return default
parameters.setdefault("shared_buffers", "128MB")
Expand Down
8 changes: 8 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
limit: 1
optional: true
database:
interface: postgresql_client
db:
Expand All @@ -37,6 +41,10 @@ provides:
limit: 1

requires:
async-replica:
interface: async_replication
limit: 1
optional: true
certificates:
interface: tls-certificates
limit: 1
Expand Down
50 changes: 44 additions & 6 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
USER,
USER_PASSWORD_KEY,
)
from relations.async_replication import PostgreSQLAsyncReplication
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model
Expand Down Expand Up @@ -166,6 +167,7 @@ def __init__(self, *args):
self.legacy_db_admin_relation = DbProvides(self, admin=True)
self.backup = PostgreSQLBackups(self, "s3-parameters")
self.tls = PostgreSQLTLS(self, PEER)
self.async_replication = PostgreSQLAsyncReplication(self)
self.restart_manager = RollingOpsManager(
charm=self, relation="restart", callback=self._restart
)
Expand Down Expand Up @@ -321,6 +323,8 @@ def primary_endpoint(self) -> Optional[str]:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
primary = self._patroni.get_primary()
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
primary = standby_leader
primary_endpoint = self._patroni.get_member_ip(primary)
# Force a retry if there is no primary or the member that was
# returned is not in the list of the current cluster members
Expand Down Expand Up @@ -420,6 +424,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _on_pgdata_storage_detaching(self, _) -> None:
# Change the primary if it's the unit that is being removed.
try:
Expand Down Expand Up @@ -513,9 +520,13 @@ def _on_peer_relation_changed(self, event: HookEvent):

# Restart the workload if it's stuck on the starting state after a timeline divergence
# due to a backup that was restored.
if not self.is_primary and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
if (
not self.is_primary
and not self.is_standby_leader
and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
)
):
self._patroni.reinitialize_postgresql()
logger.debug("Deferring on_peer_relation_changed: reinitialising replica")
Expand Down Expand Up @@ -551,8 +562,7 @@ def _update_new_unit_status(self) -> None:
# a failed switchover, so wait until the primary is elected.
if self.primary_endpoint:
self._update_relation_endpoints()
if not self.is_blocked:
self.unit.status = ActiveStatus()
self.async_replication.handle_read_only_mode()
else:
self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)

Expand Down Expand Up @@ -688,6 +698,7 @@ def _hosts(self) -> set:
def _patroni(self) -> Patroni:
"""Returns an instance of the Patroni object."""
return Patroni(
self,
self._unit_ip,
self.cluster_name,
self._member_name,
Expand All @@ -704,6 +715,11 @@ def is_primary(self) -> bool:
"""Return whether this unit is the primary instance."""
return self.unit.name == self._patroni.get_primary(unit_name_pattern=True)

@property
def is_standby_leader(self) -> bool:
"""Return whether this unit is the standby leader instance."""
return self.unit.name == self._patroni.get_standby_leader(unit_name_pattern=True)

@property
def is_tls_enabled(self) -> bool:
"""Return whether TLS is enabled."""
Expand Down Expand Up @@ -902,6 +918,9 @@ def _on_config_changed(self, event) -> None:
if self.is_blocked and "Configuration Error" in self.unit.status.message:
self.unit.status = ActiveStatus()

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

if not self.unit.is_leader():
return

Expand Down Expand Up @@ -929,6 +948,9 @@ def enable_disable_extensions(self, database: str = None) -> None:
Args:
database: optional database where to enable/disable the extension.
"""
if self._patroni.get_primary() is None:
logger.debug("Early exit enable_disable_extensions: standby cluster")
return
spi_module = ["refint", "autoinc", "insert_username", "moddatetime"]
plugins_exception = {"uuid_ossp": '"uuid-ossp"'}
original_status = self.unit.status
Expand Down Expand Up @@ -1188,6 +1210,9 @@ def _on_set_password(self, event: ActionEvent) -> None:
# Other units Patroni configuration will be reloaded in the peer relation changed event.
self.update_config()

# Update the password in the async replication data.
self.async_replication.update_async_replication_data()

event.set_results({"password": password})

def _on_update_status(self, _) -> None:
Expand Down Expand Up @@ -1225,6 +1250,9 @@ def _on_update_status(self, _) -> None:
if self._handle_workload_failures():
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

self._set_primary_status_message()

# Restart topology observer if it is gone
Expand Down Expand Up @@ -1270,8 +1298,16 @@ def _handle_workload_failures(self) -> bool:
a bool indicating whether the charm performed any action.
"""
# Restart the workload if it's stuck on the starting state after a restart.
try:
is_primary = self.is_primary
is_standby_leader = self.is_standby_leader
except RetryError:
return False

if (
not self._patroni.member_started
not is_primary
and not is_standby_leader
and not self._patroni.member_started
and "postgresql_restarted" in self._peers.data[self.unit]
and self._patroni.member_replication_lag == "unknown"
):
Expand All @@ -1291,6 +1327,8 @@ def _set_primary_status_message(self) -> None:
try:
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
self.unit.status = ActiveStatus("Primary")
elif self.is_standby_leader:
self.unit.status = ActiveStatus("Standby Leader")
elif self._patroni.member_started:
self.unit.status = ActiveStatus()
except (RetryError, ConnectionError) as e:
Expand Down
Loading

0 comments on commit 448a1a6

Please sign in to comment.