Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of remote xlog and basebackup #367

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 60 additions & 27 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def __init__(self, config_path):
"startup_time": datetime.datetime.utcnow().isoformat(),
}
self.transfer_agent_state = {} # shared among transfer agents
# Keep track of remote xlog
self.remote_xlog = {}
self.remote_basebackup = {}
self.load_config()
if self.config["transfer"]["thread_count"] > 1:
self.mp_manager = multiprocessing.Manager()
Expand Down Expand Up @@ -114,7 +117,8 @@ def __init__(self, config_path):
mp_manager=self.mp_manager,
transfer_queue=self.transfer_queue,
metrics=self.metrics,
shared_state_dict=self.transfer_agent_state)
shared_state_dict=self.transfer_agent_state,
pghoard=self)
self.transfer_agents.append(ta)

logutil.notify_systemd("READY=1")
Expand Down Expand Up @@ -254,6 +258,7 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version):
self.log.debug("Deleting wal_file: %r", wal_path)
try:
storage.delete_key(wal_path)
self.remote_xlog[site].remove(wal.name_for_tli_log_seg(tli, log, seg))
valid_timeline = True
except FileNotFoundFromStorageError:
if not valid_timeline or tli <= 1:
Expand All @@ -271,15 +276,15 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version):
self.log.exception("Problem deleting: %r", wal_path)
self.metrics.unexpected_exception(ex, where="delete_remote_wal_before")

def delete_remote_basebackup(self, site, basebackup, metadata):
def delete_remote_basebackup(self, site, basebackup):
start_time = time.monotonic()
storage = self.site_transfers.get(site)
main_backup_key = os.path.join(self.config["backup_sites"][site]["prefix"], "basebackup", basebackup)
main_backup_key = os.path.join(self.config["backup_sites"][site]["prefix"], "basebackup", basebackup["name"])
basebackup_data_files = [main_backup_key]

if metadata.get("format") == "pghoard-bb-v2":
if basebackup['metadata'].get("format") == "pghoard-bb-v2":
bmeta_compressed = storage.get_contents_to_string(main_backup_key)[0]
with rohmufile.file_reader(fileobj=io.BytesIO(bmeta_compressed), metadata=metadata,
with rohmufile.file_reader(fileobj=io.BytesIO(bmeta_compressed), metadata=basebackup['metadata'],
key_lookup=config.key_lookup_for_site(self.config, site)) as input_obj:
bmeta = extract_pghoard_bb_v2_metadata(input_obj)
self.log.debug("PGHoard chunk metadata: %r", bmeta)
Expand All @@ -299,6 +304,7 @@ def delete_remote_basebackup(self, site, basebackup, metadata):
except Exception as ex: # FIXME: don't catch all exceptions; pylint: disable=broad-except
self.log.exception("Problem deleting: %r", obj_key)
self.metrics.unexpected_exception(ex, where="delete_remote_basebackup")
self.remote_basebackup[site].remove(basebackup)
self.log.info("Deleted basebackup datafiles: %r, took: %.2fs",
', '.join(basebackup_data_files), time.monotonic() - start_time)

Expand All @@ -317,6 +323,17 @@ def get_remote_basebackups_info(self, site):
results.sort(key=lambda entry: entry["metadata"]["start-time"])
return results

def get_remote_xlogs_info(self, site):
storage = self.site_transfers.get(site)
if not storage:
storage_config = get_object_storage_config(self.config, site)
storage = get_transfer(storage_config)
self.site_transfers[site] = storage

site_config = self.config["backup_sites"][site]
results = storage.list_path(os.path.join(site_config["prefix"], "xlog"), with_metadata=False)
return [os.path.basename(x['name']) for x in results]

def patch_basebackup_info(self, *, entry, site_config):
# drop path from resulting list and convert timestamps
entry["name"] = os.path.basename(entry["name"])
Expand All @@ -335,36 +352,50 @@ def patch_basebackup_info(self, *, entry, site_config):
if "normalized-backup-time" not in metadata:
metadata["normalized-backup-time"] = self.get_normalized_backup_time(site_config, now=metadata["start-time"])

def determine_backups_to_delete(self, *, basebackups, site_config):
def determine_backups_to_delete(self, site):
"""Returns the basebackups in the given list that need to be deleted based on the given site configuration.
Note that `basebackups` is edited in place: any basebackups that need to be deleted are removed from it."""
site_config = self.config["backup_sites"][site]
allowed_basebackup_count = site_config["basebackup_count"]
if allowed_basebackup_count is None:
allowed_basebackup_count = len(basebackups)
allowed_basebackup_count = len(self.remote_basebackup[site])

basebackups_to_delete = []
while len(basebackups) > allowed_basebackup_count:
remote_basebackups = self.remote_basebackup[site][:]
for basebackup in remote_basebackups:
if (len(remote_basebackups) - len(basebackups_to_delete)) <= allowed_basebackup_count:
break
self.log.warning("Too many basebackups: %d > %d, %r, starting to get rid of %r",
len(basebackups), allowed_basebackup_count, basebackups, basebackups[0]["name"])
basebackups_to_delete.append(basebackups.pop(0))
len(self.remote_basebackup[site]),
allowed_basebackup_count,
self.remote_basebackup[site],
basebackup["name"])
basebackups_to_delete.append(basebackup)
for basebackup in basebackups_to_delete:
remote_basebackups.remove(basebackup)

backup_interval = datetime.timedelta(hours=site_config["basebackup_interval_hours"])
min_backups = site_config["basebackup_count_min"]
max_age_days = site_config.get("basebackup_age_days_max")
current_time = datetime.datetime.now(datetime.timezone.utc)
if max_age_days and min_backups > 0:
while basebackups and len(basebackups) > min_backups:
for basebackup in remote_basebackups:
if (len(remote_basebackups) - len(basebackups_to_delete)) <= min_backups:
break
# For age checks we treat the age as current_time - (backup_start_time + backup_interval). So when
# backup interval is set to 24 hours a backup started 2.5 days ago would be considered to be 1.5 days old.
completed_at = basebackups[0]["metadata"]["start-time"] + backup_interval
completed_at = basebackup["metadata"]["start-time"] + backup_interval
backup_age = current_time - completed_at
# timedelta would have direct `days` attribute but that's an integer rounded down. We want a float
# so that we can react immediately when age is too old
backup_age_days = backup_age.total_seconds() / 60.0 / 60.0 / 24.0
if backup_age_days > max_age_days:
self.log.warning("Basebackup %r too old: %.3f > %.3f, %r, starting to get rid of it",
basebackups[0]["name"], backup_age_days, max_age_days, basebackups)
basebackups_to_delete.append(basebackups.pop(0))
basebackup["name"],
backup_age_days,
max_age_days,
self.remote_basebackup)
basebackups_to_delete.append(basebackup)
else:
break

Expand All @@ -373,25 +404,20 @@ def determine_backups_to_delete(self, *, basebackups, site_config):
def refresh_backup_list_and_delete_old(self, site):
"""Look up basebackups from the object store, prune any extra
backups and return the datetime of the latest backup."""
basebackups = self.get_remote_basebackups_info(site)
self.log.debug("Found %r basebackups", basebackups)
self.log.debug("Found %r basebackups", self.remote_basebackup[site])

site_config = self.config["backup_sites"][site]
# Never delete backups from a recovery site. This check is already elsewhere as well
# but still check explicitly here to ensure we certainly won't delete anything unexpectedly
if site_config["active"]:
basebackups_to_delete = self.determine_backups_to_delete(basebackups=basebackups, site_config=site_config)

basebackups_to_delete = self.determine_backups_to_delete(site)
for basebackup_to_be_deleted in basebackups_to_delete:
pg_version = basebackup_to_be_deleted["metadata"].get("pg-version")
last_wal_segment_still_needed = 0
if basebackups:
last_wal_segment_still_needed = basebackups[0]["metadata"]["start-wal-segment"]
self.delete_remote_basebackup(site, basebackup_to_be_deleted)

if last_wal_segment_still_needed:
self.delete_remote_wal_before(last_wal_segment_still_needed, site, pg_version)
self.delete_remote_basebackup(site, basebackup_to_be_deleted["name"], basebackup_to_be_deleted["metadata"])
self.state["backup_sites"][site]["basebackups"] = basebackups
if len(basebackups_to_delete) > 0 and len(self.remote_basebackup[site]) > 0:
pg_version = basebackups_to_delete[0]["metadata"].get("pg-version")
last_wal_segment_still_needed = self.remote_basebackup[site][0]["metadata"]["start-wal-segment"]
self.delete_remote_wal_before(last_wal_segment_still_needed, site, pg_version)

def get_normalized_backup_time(self, site_config, *, now=None):
"""Returns the closest historical backup time that current time matches to (or current time if it matches).
Expand Down Expand Up @@ -508,6 +534,13 @@ def handle_site(self, site, site_config):
if not site_config["active"]:
return # If a site has been marked inactive, don't bother checking anything

if site not in self.remote_xlog or site not in self.remote_basebackup:
self.log.info("Retrieving info from remote storage for %s", site)
self.remote_xlog[site] = self.get_remote_xlogs_info(site)
self.remote_basebackup[site] = self.get_remote_basebackups_info(site)
self.state["backup_sites"][site]["basebackups"] = self.remote_basebackup[site]
self.log.info("Remote info updated for %s", site)

self._cleanup_inactive_receivexlogs(site)

chosen_backup_node = random.choice(site_config["nodes"])
Expand Down Expand Up @@ -557,7 +590,7 @@ def get_new_backup_details(self, *, now=None, site, site_config):
be created at this time"""
if not now:
now = datetime.datetime.now(datetime.timezone.utc)
basebackups = self.state["backup_sites"][site]["basebackups"]
basebackups = self.remote_basebackup[site]
backup_hour = site_config.get("basebackup_hour")
backup_minute = site_config.get("basebackup_minute")
backup_reason = None
Expand Down
14 changes: 13 additions & 1 deletion pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

class TransferAgent(Thread):
def __init__(self, config, compression_queue, mp_manager, transfer_queue, metrics,
shared_state_dict):
shared_state_dict, pghoard):
super().__init__()
self.log = logging.getLogger("TransferAgent")
self.config = config
Expand All @@ -36,6 +36,7 @@ def __init__(self, config, compression_queue, mp_manager, transfer_queue, metric
self.running = True
self.sleep = time.sleep
self.state = shared_state_dict
self.pghoard = pghoard
self.site_transfers = {}
self.log.debug("TransferAgent initialized")

Expand Down Expand Up @@ -252,6 +253,17 @@ def handle_upload(self, site, key, file_to_transfer):
except Exception as ex: # pylint: disable=broad-except
self.log.exception("Problem in deleting file: %r", file_to_transfer["local_path"])
self.metrics.unexpected_exception(ex, where="handle_upload_unlink")

# update metrics for remote xlog and base backup
if file_to_transfer.get('filetype') == 'xlog':
self.pghoard.remote_xlog[site].append(os.path.basename(key))
elif file_to_transfer.get('filetype') == 'basebackup':
new_basebackup = list(storage.iter_key(key, include_key=True))[0].value
# patch metadata
self.pghoard.patch_basebackup_info(entry=new_basebackup,
site_config=self.pghoard.config["backup_sites"][site])
self.pghoard.remote_basebackup[site].append(new_basebackup)

return {"success": True, "opaque": file_to_transfer.get("opaque")}
except Exception as ex: # pylint: disable=broad-except
if file_to_transfer.get("retry_number", 0) > 0:
Expand Down
12 changes: 11 additions & 1 deletion test/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""
# pylint: disable=attribute-defined-outside-init
from pghoard.config import find_pg_binary, set_and_check_config_defaults
from pghoard.rohmu import compat
from pghoard.rohmu import compat, dates
from shutil import rmtree
from tempfile import mkdtemp
import logging
Expand Down Expand Up @@ -96,6 +96,16 @@ def config_template(self, override=None):
def setup_method(self, method):
self.temp_dir = mkdtemp(prefix=self.__class__.__name__)
self.test_site = "site_{}".format(method.__name__)
self.remote_xlog = {}
self.remote_basebackup = {}
self.remote_xlog[self.test_site] = []
self.remote_basebackup[self.test_site] = []

def teardown_method(self, method): # pylint: disable=unused-argument
rmtree(self.temp_dir)

def patch_basebackup_info(self, *, entry, site_config): # pylint: disable=unused-argument
# drop path from resulting list and convert timestamps
entry["name"] = os.path.basename(entry["name"])
metadata = entry["metadata"]
metadata["start-time"] = dates.parse_timestamp(metadata["start-time"])
6 changes: 6 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ def pghoard_base(db, tmpdir, request, compression="snappy", # pylint: disable=

pgh = PGHoard(confpath)
pgh.test_site = test_site
pgh.remote_xlog = {}
pgh.remote_basebackup = {}
pgh.remote_xlog[pgh.test_site] = []
pgh.remote_basebackup[pgh.test_site] = []
pgh.set_state_defaults(pgh.test_site)
pgh.state["backup_sites"][pgh.test_site]["basebackups"] = pgh.remote_basebackup[pgh.test_site]
pgh.start_threads_on_startup()
if compression == "snappy":
pgh.Compressor = snappy.StreamCompressor
Expand Down
8 changes: 4 additions & 4 deletions test/test_basebackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ def test_handle_site(self, pghoard):
# now call handle_site so it notices the backup has finished (this must not start a new one)
pghoard.handle_site(pghoard.test_site, site_config)
assert pghoard.test_site not in pghoard.basebackups
first_basebackups = pghoard.state["backup_sites"][pghoard.test_site]["basebackups"]
first_basebackups = pghoard.remote_basebackup[pghoard.test_site][:]
assert first_basebackups[0]["metadata"]["backup-reason"] == "scheduled"
assert first_basebackups[0]["metadata"]["backup-decision-time"]
assert first_basebackups[0]["metadata"]["normalized-backup-time"] is None
Expand All @@ -570,7 +570,7 @@ def test_handle_site(self, pghoard):
pghoard.handle_site(pghoard.test_site, site_config)
assert pghoard.test_site not in pghoard.basebackups

second_basebackups = pghoard.state["backup_sites"][pghoard.test_site]["basebackups"]
second_basebackups = pghoard.remote_basebackup[pghoard.test_site][:]
second_time_of_check = pghoard.time_of_last_backup_check[pghoard.test_site]
assert second_basebackups == first_basebackups
assert second_time_of_check > first_time_of_check
Expand All @@ -584,7 +584,7 @@ def test_handle_site(self, pghoard):
pghoard.handle_site(pghoard.test_site, site_config)
assert pghoard.test_site not in pghoard.basebackups

third_basebackups = pghoard.state["backup_sites"][pghoard.test_site]["basebackups"]
third_basebackups = pghoard.remote_basebackup[pghoard.test_site][:]
third_time_of_check = pghoard.time_of_last_backup_check[pghoard.test_site]
assert third_basebackups != second_basebackups
assert third_time_of_check > second_time_of_check
Expand All @@ -594,7 +594,7 @@ def test_handle_site(self, pghoard):
pghoard.handle_site(pghoard.test_site, site_config)
assert pghoard.test_site not in pghoard.basebackups

fourth_basebackups = pghoard.state["backup_sites"][pghoard.test_site]["basebackups"]
fourth_basebackups = pghoard.remote_basebackup[pghoard.test_site][:]
fourth_time_of_check = pghoard.time_of_last_backup_check[pghoard.test_site]
assert fourth_basebackups == third_basebackups
assert fourth_time_of_check == third_time_of_check
Expand Down
Loading