Skip to content

Commit

Permalink
[DPE-2897] Cross-region async replication (#447)
Browse files Browse the repository at this point in the history
* Add async replication implementation

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Backup standby pgdata folder

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Improve comments and logs

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Remove unused constant

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Remove warning log call and add optional type hint

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Revert poetry.lock

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Revert poetry.lock

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

---------

Signed-off-by: Marcelo Henrique Neppel <[email protected]>
  • Loading branch information
marceloneppel authored May 3, 2024
1 parent 15bf90b commit a0e0562
Show file tree
Hide file tree
Showing 10 changed files with 898 additions and 40 deletions.
6 changes: 6 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ list-backups:
description: Lists backups in s3 storage in AWS.
pre-upgrade-check:
description: Run necessary pre-upgrade checks and preparations before executing a charm refresh.
promote-cluster:
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
params:
force-promotion:
type: boolean
description: Force the promotion of a cluster when there is already a primary cluster.
restore:
description: Restore a database backup using pgBackRest.
S3 credentials are retrieved from a relation with the S3 integrator charm.
Expand Down
14 changes: 9 additions & 5 deletions lib/charms/postgresql_k8s/v0/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 25
LIBPATCH = 26

INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles"

Expand Down Expand Up @@ -476,11 +476,11 @@ def set_up_database(self) -> None:
"""Set up postgres database with the right permissions."""
connection = None
try:
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
with self._connect_to_database() as connection, connection.cursor() as cursor:
cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';")
if cursor.fetchone() is not None:
return

# Allow access to the postgres database only to the system users.
cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;")
cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;")
Expand All @@ -490,6 +490,10 @@ def set_up_database(self) -> None:
sql.Identifier(user)
)
)
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;")
except psycopg2.Error as e:
logger.error(f"Failed to set up databases: {e}")
Expand Down
8 changes: 8 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
limit: 1
optional: true
database:
interface: postgresql_client
db:
Expand All @@ -51,6 +55,10 @@ provides:
interface: grafana_dashboard

requires:
async-replica:
interface: async_replication
limit: 1
optional: true
certificates:
interface: tls-certificates
limit: 1
Expand Down
13 changes: 13 additions & 0 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed

from constants import BACKUP_USER, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER
from relations.async_replication import ASYNC_PRIMARY_RELATION, ASYNC_REPLICA_RELATION

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -717,6 +718,18 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool:
event.fail(error_message)
return False

logger.info("Checking that the cluster is not replicating data to a standby cluster")
for relation in [
self.model.get_relation(ASYNC_REPLICA_RELATION),
self.model.get_relation(ASYNC_PRIMARY_RELATION),
]:
if not relation:
continue
error_message = "Unit cannot restore backup as the cluster is replicating data to a standby cluster"
logger.error(f"Restore failed: {error_message}")
event.fail(error_message)
return False

logger.info("Checking that this unit was already elected the leader unit")
if not self.charm.unit.is_leader():
error_message = "Unit cannot restore backup as it was not elected the leader unit yet"
Expand Down
96 changes: 76 additions & 20 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
WORKLOAD_OS_USER,
)
from patroni import NotReadyError, Patroni
from relations.async_replication import PostgreSQLAsyncReplication
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model
Expand Down Expand Up @@ -163,6 +164,7 @@ def __init__(self, *args):
self.legacy_db_admin_relation = DbProvides(self, admin=True)
self.backup = PostgreSQLBackups(self, "s3-parameters")
self.tls = PostgreSQLTLS(self, PEER, [self.primary_endpoint, self.replicas_endpoint])
self.async_replication = PostgreSQLAsyncReplication(self)
self.restart_manager = RollingOpsManager(
charm=self, relation="restart", callback=self._restart
)
Expand Down Expand Up @@ -351,6 +353,18 @@ def _get_endpoints_to_remove(self) -> List[str]:
endpoints_to_remove = list(set(old) - set(current))
return endpoints_to_remove

def get_unit_ip(self, unit: Unit) -> Optional[str]:
"""Get the IP address of a specific unit."""
# Check if host is current host.
if unit == self.unit:
return str(self.model.get_binding(PEER).network.bind_address)
# Check if host is a peer.
elif unit in self._peers.data:
return str(self._peers.data[unit].get("private-address"))
# Return None if the unit is not a peer neither the current unit.
else:
return None

def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
"""The leader removes the departing units from the list of cluster members."""
# Allow leader to update endpoints if it isn't leaving.
Expand All @@ -368,6 +382,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
self.postgresql_client_relation.update_read_only_endpoint()
self._remove_from_endpoints(endpoints_to_remove)

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _on_peer_relation_changed(self, event: HookEvent) -> None:
"""Reconfigure cluster members."""
# The cluster must be initialized first in the leader unit
Expand Down Expand Up @@ -412,9 +429,13 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None:

# Restart the workload if it's stuck on the starting state after a timeline divergence
# due to a backup that was restored.
if not self.is_primary and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
if (
not self.is_primary
and not self.is_standby_leader
and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
)
):
self._patroni.reinitialize_postgresql()
logger.debug("Deferring on_peer_relation_changed: reinitialising replica")
Expand All @@ -440,8 +461,7 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None:
else:
self.unit_peer_data.pop("start-tls-server", None)

if not self.is_blocked:
self._set_active_status()
self.async_replication.handle_read_only_mode()

def _on_config_changed(self, event) -> None:
"""Handle configuration changes, like enabling plugins."""
Expand Down Expand Up @@ -471,6 +491,9 @@ def _on_config_changed(self, event) -> None:
if self.is_blocked and "Configuration Error" in self.unit.status.message:
self._set_active_status()

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

if not self.unit.is_leader():
return

Expand Down Expand Up @@ -498,6 +521,9 @@ def enable_disable_extensions(self, database: str = None) -> None:
Args:
database: optional database where to enable/disable the extension.
"""
if self._patroni.get_primary() is None:
logger.debug("Early exit enable_disable_extensions: standby cluster")
return
spi_module = ["refint", "autoinc", "insert_username", "moddatetime"]
plugins_exception = {"uuid_ossp": '"uuid-ossp"'}
original_status = self.unit.status
Expand Down Expand Up @@ -641,6 +667,25 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None:
self._add_to_endpoints(self._endpoint)

self._cleanup_old_cluster_resources()

if not self.fix_leader_annotation():
return

# Create resources and add labels needed for replication.
try:
self._create_services()
except ApiError:
logger.exception("failed to create k8s services")
self.unit.status = BlockedStatus("failed to create k8s services")
return

# Remove departing units when the leader changes.
self._remove_from_endpoints(self._get_endpoints_to_remove())

self._add_members(event)

def fix_leader_annotation(self) -> bool:
"""Fix the leader annotation if it's missing."""
client = Client()
try:
endpoint = client.get(Endpoints, name=self.cluster_name, namespace=self._namespace)
Expand All @@ -657,23 +702,11 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None:
except ApiError as e:
if e.status.code == 403:
self.on_deployed_without_trust()
return
return False
# Ignore the error only when the resource doesn't exist.
if e.status.code != 404:
raise e

# Create resources and add labels needed for replication.
try:
self._create_services()
except ApiError:
logger.exception("failed to create k8s services")
self.unit.status = BlockedStatus("failed to create k8s services")
return

# Remove departing units when the leader changes.
self._remove_from_endpoints(self._get_endpoints_to_remove())

self._add_members(event)
return True

def _create_pgdata(self, container: Container):
"""Create the PostgreSQL data directory."""
Expand Down Expand Up @@ -747,6 +780,8 @@ def _set_active_status(self):
try:
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
self.unit.status = ActiveStatus("Primary")
elif self.is_standby_leader:
self.unit.status = ActiveStatus("Standby Leader")
elif self._patroni.member_started:
self.unit.status = ActiveStatus()
except (RetryError, ConnectionError) as e:
Expand Down Expand Up @@ -819,6 +854,9 @@ def _on_upgrade_charm(self, _) -> None:
self.unit.status = BlockedStatus(f"failed to patch pod with error {e}")
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _patch_pod_labels(self, member: str) -> None:
"""Add labels required for replication to the current pod.
Expand Down Expand Up @@ -988,6 +1026,9 @@ def _on_set_password(self, event: ActionEvent) -> None:
# Other units Patroni configuration will be reloaded in the peer relation changed event.
self.update_config()

# Update the password in the async replication data.
self.async_replication.update_async_replication_data()

event.set_results({"password": password})

def _on_get_primary(self, event: ActionEvent) -> None:
Expand Down Expand Up @@ -1109,6 +1150,9 @@ def _on_update_status(self, _) -> None:
if self._handle_processes_failures():
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

self._set_active_status()

def _handle_processes_failures(self) -> bool:
Expand All @@ -1131,8 +1175,15 @@ def _handle_processes_failures(self) -> bool:
return False
return True

try:
is_primary = self.is_primary
is_standby_leader = self.is_standby_leader
except RetryError:
return False

if (
not self.is_primary
not is_primary
and not is_standby_leader
and self._patroni.member_started
and not self._patroni.member_streaming
):
Expand Down Expand Up @@ -1170,6 +1221,11 @@ def is_primary(self) -> bool:
"""Return whether this unit is the primary instance."""
return self._unit == self._patroni.get_primary(unit_name_pattern=True)

@property
def is_standby_leader(self) -> bool:
"""Return whether this unit is the standby leader instance."""
return self._unit == self._patroni.get_standby_leader(unit_name_pattern=True)

@property
def is_tls_enabled(self) -> bool:
"""Return whether TLS is enabled."""
Expand Down
1 change: 1 addition & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
WORKLOAD_OS_GROUP = "postgres"
WORKLOAD_OS_USER = "postgres"
METRICS_PORT = "9187"
POSTGRESQL_DATA_PATH = "/var/lib/postgresql/data/pgdata"
POSTGRES_LOG_FILES = [
"/var/log/pgbackrest/*",
"/var/log/postgresql/patroni.log",
Expand Down
Loading

0 comments on commit a0e0562

Please sign in to comment.