Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2953] Cross-region async replication #452

Merged
merged 11 commits into from
May 3, 2024
Next Next commit
Add async replication implementation
Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>
  • Loading branch information
marceloneppel committed Apr 22, 2024
commit ea1dedd352481f150f172f97e6dc10fa7daee824
6 changes: 6 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
@@ -17,6 +17,12 @@ list-backups:
description: Lists backups in s3 storage.
pre-upgrade-check:
description: Run necessary pre-upgrade checks and preparations before executing a charm refresh.
promote-cluster:
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
params:
force-promotion:
taurus-forever marked this conversation as resolved.
Show resolved Hide resolved
type: boolean
description: Force the promotion of a cluster when there is already a primary cluster.
taurus-forever marked this conversation as resolved.
Show resolved Hide resolved
restore:
description: Restore a database backup using pgBackRest.
S3 credentials are retrieved from a relation with the S3 integrator charm.
45 changes: 23 additions & 22 deletions lib/charms/postgresql_k8s/v0/postgresql.py
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

Any charm using this library should import the `psycopg2` or `psycopg2-binary` dependency.
"""

import logging
from collections import OrderedDict
from typing import Dict, List, Optional, Set, Tuple
@@ -35,7 +36,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 24
LIBPATCH = 26

INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles"

@@ -358,9 +359,7 @@ def _generate_database_privileges_statements(
statements.append(
"""UPDATE pg_catalog.pg_largeobject_metadata
SET lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}')
WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format(
user, self.user
)
WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format(user, self.user)
)
else:
for schema in schemas:
@@ -477,11 +476,11 @@ def set_up_database(self) -> None:
"""Set up postgres database with the right permissions."""
connection = None
try:
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
with self._connect_to_database() as connection, connection.cursor() as cursor:
cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';")
if cursor.fetchone() is not None:
return

# Allow access to the postgres database only to the system users.
cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;")
cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;")
@@ -491,6 +490,10 @@ def set_up_database(self) -> None:
sql.Identifier(user)
)
)
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;")
except psycopg2.Error as e:
logger.error(f"Failed to set up databases: {e}")
@@ -562,18 +565,16 @@ def build_postgresql_parameters(
parameters = {}
for config, value in config_options.items():
# Filter config option not related to PostgreSQL parameters.
if not config.startswith(
(
"durability",
"instance",
"logging",
"memory",
"optimizer",
"request",
"response",
"vacuum",
)
):
if not config.startswith((
"durability",
"instance",
"logging",
"memory",
"optimizer",
"request",
"response",
"vacuum",
)):
continue
parameter = "_".join(config.split("_")[1:])
if parameter in ["date_style", "time_zone"]:
@@ -594,8 +595,8 @@ def build_postgresql_parameters(
# and the remaining as cache memory.
shared_buffers = int(available_memory * 0.25)
effective_cache_size = int(available_memory - shared_buffers)
parameters.setdefault("shared_buffers", f"{int(shared_buffers/10**6)}MB")
parameters.update({"effective_cache_size": f"{int(effective_cache_size/10**6)}MB"})
parameters.setdefault("shared_buffers", f"{int(shared_buffers / 10**6)}MB")
parameters.update({"effective_cache_size": f"{int(effective_cache_size / 10**6)}MB"})
else:
# Return default
parameters.setdefault("shared_buffers", "128MB")
8 changes: 8 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
@@ -26,6 +26,10 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
limit: 1
optional: true
taurus-forever marked this conversation as resolved.
Show resolved Hide resolved
database:
interface: postgresql_client
db:
@@ -37,6 +41,10 @@ provides:
limit: 1

requires:
async-replica:
interface: async_replication
limit: 1
optional: true
taurus-forever marked this conversation as resolved.
Show resolved Hide resolved
certificates:
interface: tls-certificates
limit: 1
3 changes: 1 addition & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 44 additions & 6 deletions src/charm.py
Original file line number Diff line number Diff line change
@@ -86,6 +86,7 @@
USER,
USER_PASSWORD_KEY,
)
from relations.async_replication import PostgreSQLAsyncReplication
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model
@@ -179,6 +180,7 @@ def __init__(self, *args):
self.legacy_db_admin_relation = DbProvides(self, admin=True)
self.backup = PostgreSQLBackups(self, "s3-parameters")
self.tls = PostgreSQLTLS(self, PEER)
self.async_replication = PostgreSQLAsyncReplication(self)
self.restart_manager = RollingOpsManager(
charm=self, relation="restart", callback=self._restart
)
@@ -321,6 +323,8 @@ def primary_endpoint(self) -> Optional[str]:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
primary = self._patroni.get_primary()
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
primary = standby_leader
primary_endpoint = self._patroni.get_member_ip(primary)
# Force a retry if there is no primary or the member that was
# returned is not in the list of the current cluster members
@@ -420,6 +424,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _on_pgdata_storage_detaching(self, _) -> None:
# Change the primary if it's the unit that is being removed.
try:
@@ -513,9 +520,13 @@ def _on_peer_relation_changed(self, event: HookEvent):

# Restart the workload if it's stuck on the starting state after a timeline divergence
# due to a backup that was restored.
if not self.is_primary and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
if (
not self.is_primary
and not self.is_standby_leader
and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
)
):
self._patroni.reinitialize_postgresql()
logger.debug("Deferring on_peer_relation_changed: reinitialising replica")
@@ -551,8 +562,7 @@ def _update_new_unit_status(self) -> None:
# a failed switchover, so wait until the primary is elected.
if self.primary_endpoint:
self._update_relation_endpoints()
if not self.is_blocked:
self.unit.status = ActiveStatus()
self.async_replication.handle_read_only_mode()
else:
self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)

@@ -688,6 +698,7 @@ def _hosts(self) -> set:
def _patroni(self) -> Patroni:
"""Returns an instance of the Patroni object."""
return Patroni(
self,
self._unit_ip,
self.cluster_name,
self._member_name,
@@ -704,6 +715,11 @@ def is_primary(self) -> bool:
"""Return whether this unit is the primary instance."""
return self.unit.name == self._patroni.get_primary(unit_name_pattern=True)

@property
def is_standby_leader(self) -> bool:
"""Return whether this unit is the standby leader instance."""
return self.unit.name == self._patroni.get_standby_leader(unit_name_pattern=True)

@property
def is_tls_enabled(self) -> bool:
"""Return whether TLS is enabled."""
@@ -902,6 +918,9 @@ def _on_config_changed(self, event) -> None:
if self.is_blocked and "Configuration Error" in self.unit.status.message:
self.unit.status = ActiveStatus()

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

if not self.unit.is_leader():
return

@@ -929,6 +948,9 @@ def enable_disable_extensions(self, database: str = None) -> None:
Args:
database: optional database where to enable/disable the extension.
"""
if self._patroni.get_primary() is None:
logger.debug("Early exit enable_disable_extensions: standby cluster")
return
spi_module = ["refint", "autoinc", "insert_username", "moddatetime"]
plugins_exception = {"uuid_ossp": '"uuid-ossp"'}
original_status = self.unit.status
@@ -1188,6 +1210,9 @@ def _on_set_password(self, event: ActionEvent) -> None:
# Other units Patroni configuration will be reloaded in the peer relation changed event.
self.update_config()

# Update the password in the async replication data.
self.async_replication.update_async_replication_data()

event.set_results({"password": password})

def _on_update_status(self, _) -> None:
@@ -1225,6 +1250,9 @@ def _on_update_status(self, _) -> None:
if self._handle_workload_failures():
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

self._set_primary_status_message()

# Restart topology observer if it is gone
@@ -1270,8 +1298,16 @@ def _handle_workload_failures(self) -> bool:
a bool indicating whether the charm performed any action.
"""
# Restart the workload if it's stuck on the starting state after a restart.
try:
is_primary = self.is_primary
is_standby_leader = self.is_standby_leader
except RetryError:
return False

if (
not self._patroni.member_started
not is_primary
and not is_standby_leader
and not self._patroni.member_started
and "postgresql_restarted" in self._peers.data[self.unit]
and self._patroni.member_replication_lag == "unknown"
):
@@ -1291,6 +1327,8 @@ def _set_primary_status_message(self) -> None:
try:
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
self.unit.status = ActiveStatus("Primary")
elif self.is_standby_leader:
self.unit.status = ActiveStatus("Standby Leader")
elif self._patroni.member_started:
self.unit.status = ActiveStatus()
except (RetryError, ConnectionError) as e:
Loading
Loading