diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 2eb1311b..f6ca9a8c 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -70,6 +70,9 @@ def __init__(self, config_path): "startup_time": datetime.datetime.utcnow().isoformat(), } self.transfer_agent_state = {} # shared among transfer agents + # Keep track of remote xlog + self.remote_xlog = {} + self.remote_basebackup = {} self.load_config() if self.config["transfer"]["thread_count"] > 1: self.mp_manager = multiprocessing.Manager() @@ -114,7 +117,8 @@ def __init__(self, config_path): mp_manager=self.mp_manager, transfer_queue=self.transfer_queue, metrics=self.metrics, - shared_state_dict=self.transfer_agent_state) + shared_state_dict=self.transfer_agent_state, + pghoard=self) self.transfer_agents.append(ta) logutil.notify_systemd("READY=1") @@ -254,6 +258,7 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version): self.log.debug("Deleting wal_file: %r", wal_path) try: storage.delete_key(wal_path) + self.remote_xlog[site].remove(wal.name_for_tli_log_seg(tli, log, seg)) valid_timeline = True except FileNotFoundFromStorageError: if not valid_timeline or tli <= 1: @@ -271,15 +276,15 @@ def delete_remote_wal_before(self, wal_segment, site, pg_version): self.log.exception("Problem deleting: %r", wal_path) self.metrics.unexpected_exception(ex, where="delete_remote_wal_before") - def delete_remote_basebackup(self, site, basebackup, metadata): + def delete_remote_basebackup(self, site, basebackup): start_time = time.monotonic() storage = self.site_transfers.get(site) - main_backup_key = os.path.join(self.config["backup_sites"][site]["prefix"], "basebackup", basebackup) + main_backup_key = os.path.join(self.config["backup_sites"][site]["prefix"], "basebackup", basebackup["name"]) basebackup_data_files = [main_backup_key] - if metadata.get("format") == "pghoard-bb-v2": + if basebackup['metadata'].get("format") == "pghoard-bb-v2": bmeta_compressed = storage.get_contents_to_string(main_backup_key)[0] - with rohmufile.file_reader(fileobj=io.BytesIO(bmeta_compressed), metadata=metadata, + with rohmufile.file_reader(fileobj=io.BytesIO(bmeta_compressed), metadata=basebackup['metadata'], key_lookup=config.key_lookup_for_site(self.config, site)) as input_obj: bmeta = extract_pghoard_bb_v2_metadata(input_obj) self.log.debug("PGHoard chunk metadata: %r", bmeta) @@ -299,6 +304,7 @@ def delete_remote_basebackup(self, site, basebackup, metadata): except Exception as ex: # FIXME: don't catch all exceptions; pylint: disable=broad-except self.log.exception("Problem deleting: %r", obj_key) self.metrics.unexpected_exception(ex, where="delete_remote_basebackup") + self.remote_basebackup[site].remove(basebackup) self.log.info("Deleted basebackup datafiles: %r, took: %.2fs", ', '.join(basebackup_data_files), time.monotonic() - start_time) @@ -317,6 +323,17 @@ def get_remote_basebackups_info(self, site): results.sort(key=lambda entry: entry["metadata"]["start-time"]) return results + def get_remote_xlogs_info(self, site): + storage = self.site_transfers.get(site) + if not storage: + storage_config = get_object_storage_config(self.config, site) + storage = get_transfer(storage_config) + self.site_transfers[site] = storage + + site_config = self.config["backup_sites"][site] + results = storage.list_path(os.path.join(site_config["prefix"], "xlog"), with_metadata=False) + return [os.path.basename(x['name']) for x in results] + def patch_basebackup_info(self, *, entry, site_config): # drop path from resulting list and convert timestamps entry["name"] = os.path.basename(entry["name"]) @@ -335,36 +352,50 @@ def patch_basebackup_info(self, *, entry, site_config): if "normalized-backup-time" not in metadata: metadata["normalized-backup-time"] = self.get_normalized_backup_time(site_config, now=metadata["start-time"]) - def determine_backups_to_delete(self, *, basebackups, site_config): + def determine_backups_to_delete(self, site): """Returns the basebackups in the given list that need to be deleted based on the given site configuration. Note that `basebackups` is edited in place: any basebackups that need to be deleted are removed from it.""" + site_config = self.config["backup_sites"][site] allowed_basebackup_count = site_config["basebackup_count"] if allowed_basebackup_count is None: - allowed_basebackup_count = len(basebackups) + allowed_basebackup_count = len(self.remote_basebackup[site]) basebackups_to_delete = [] - while len(basebackups) > allowed_basebackup_count: + remote_basebackups = self.remote_basebackup[site][:] + for basebackup in remote_basebackups: + if (len(remote_basebackups) - len(basebackups_to_delete)) <= allowed_basebackup_count: + break self.log.warning("Too many basebackups: %d > %d, %r, starting to get rid of %r", - len(basebackups), allowed_basebackup_count, basebackups, basebackups[0]["name"]) - basebackups_to_delete.append(basebackups.pop(0)) + len(self.remote_basebackup[site]), + allowed_basebackup_count, + self.remote_basebackup[site], + basebackup["name"]) + basebackups_to_delete.append(basebackup) + for basebackup in basebackups_to_delete: + remote_basebackups.remove(basebackup) backup_interval = datetime.timedelta(hours=site_config["basebackup_interval_hours"]) min_backups = site_config["basebackup_count_min"] max_age_days = site_config.get("basebackup_age_days_max") current_time = datetime.datetime.now(datetime.timezone.utc) if max_age_days and min_backups > 0: - while basebackups and len(basebackups) > min_backups: + for basebackup in remote_basebackups: + if (len(remote_basebackups) - len(basebackups_to_delete)) <= min_backups: + break # For age checks we treat the age as current_time - (backup_start_time + backup_interval). So when # backup interval is set to 24 hours a backup started 2.5 days ago would be considered to be 1.5 days old. - completed_at = basebackups[0]["metadata"]["start-time"] + backup_interval + completed_at = basebackup["metadata"]["start-time"] + backup_interval backup_age = current_time - completed_at # timedelta would have direct `days` attribute but that's an integer rounded down. We want a float # so that we can react immediately when age is too old backup_age_days = backup_age.total_seconds() / 60.0 / 60.0 / 24.0 if backup_age_days > max_age_days: self.log.warning("Basebackup %r too old: %.3f > %.3f, %r, starting to get rid of it", - basebackups[0]["name"], backup_age_days, max_age_days, basebackups) - basebackups_to_delete.append(basebackups.pop(0)) + basebackup["name"], + backup_age_days, + max_age_days, + self.remote_basebackup) + basebackups_to_delete.append(basebackup) else: break @@ -373,25 +404,20 @@ def determine_backups_to_delete(self, *, basebackups, site_config): def refresh_backup_list_and_delete_old(self, site): """Look up basebackups from the object store, prune any extra backups and return the datetime of the latest backup.""" - basebackups = self.get_remote_basebackups_info(site) - self.log.debug("Found %r basebackups", basebackups) + self.log.debug("Found %r basebackups", self.remote_basebackup[site]) site_config = self.config["backup_sites"][site] # Never delete backups from a recovery site. This check is already elsewhere as well # but still check explicitly here to ensure we certainly won't delete anything unexpectedly if site_config["active"]: - basebackups_to_delete = self.determine_backups_to_delete(basebackups=basebackups, site_config=site_config) - + basebackups_to_delete = self.determine_backups_to_delete(site) for basebackup_to_be_deleted in basebackups_to_delete: - pg_version = basebackup_to_be_deleted["metadata"].get("pg-version") - last_wal_segment_still_needed = 0 - if basebackups: - last_wal_segment_still_needed = basebackups[0]["metadata"]["start-wal-segment"] + self.delete_remote_basebackup(site, basebackup_to_be_deleted) - if last_wal_segment_still_needed: - self.delete_remote_wal_before(last_wal_segment_still_needed, site, pg_version) - self.delete_remote_basebackup(site, basebackup_to_be_deleted["name"], basebackup_to_be_deleted["metadata"]) - self.state["backup_sites"][site]["basebackups"] = basebackups + if len(basebackups_to_delete) > 0 and len(self.remote_basebackup[site]) > 0: + pg_version = basebackups_to_delete[0]["metadata"].get("pg-version") + last_wal_segment_still_needed = self.remote_basebackup[site][0]["metadata"]["start-wal-segment"] + self.delete_remote_wal_before(last_wal_segment_still_needed, site, pg_version) def get_normalized_backup_time(self, site_config, *, now=None): """Returns the closest historical backup time that current time matches to (or current time if it matches). @@ -508,6 +534,13 @@ def handle_site(self, site, site_config): if not site_config["active"]: return # If a site has been marked inactive, don't bother checking anything + if site not in self.remote_xlog or site not in self.remote_basebackup: + self.log.info("Retrieving info from remote storage for %s", site) + self.remote_xlog[site] = self.get_remote_xlogs_info(site) + self.remote_basebackup[site] = self.get_remote_basebackups_info(site) + self.state["backup_sites"][site]["basebackups"] = self.remote_basebackup[site] + self.log.info("Remote info updated for %s", site) + self._cleanup_inactive_receivexlogs(site) chosen_backup_node = random.choice(site_config["nodes"]) @@ -557,7 +590,7 @@ def get_new_backup_details(self, *, now=None, site, site_config): be created at this time""" if not now: now = datetime.datetime.now(datetime.timezone.utc) - basebackups = self.state["backup_sites"][site]["basebackups"] + basebackups = self.remote_basebackup[site] backup_hour = site_config.get("basebackup_hour") backup_minute = site_config.get("basebackup_minute") backup_reason = None diff --git a/pghoard/transfer.py b/pghoard/transfer.py index 7c06cd83..a311d0a8 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -24,7 +24,7 @@ class TransferAgent(Thread): def __init__(self, config, compression_queue, mp_manager, transfer_queue, metrics, - shared_state_dict): + shared_state_dict, pghoard): super().__init__() self.log = logging.getLogger("TransferAgent") self.config = config @@ -36,6 +36,7 @@ def __init__(self, config, compression_queue, mp_manager, transfer_queue, metric self.running = True self.sleep = time.sleep self.state = shared_state_dict + self.pghoard = pghoard self.site_transfers = {} self.log.debug("TransferAgent initialized") @@ -252,6 +253,17 @@ def handle_upload(self, site, key, file_to_transfer): except Exception as ex: # pylint: disable=broad-except self.log.exception("Problem in deleting file: %r", file_to_transfer["local_path"]) self.metrics.unexpected_exception(ex, where="handle_upload_unlink") + + # update metrics for remote xlog and base backup + if file_to_transfer.get('filetype') == 'xlog': + self.pghoard.remote_xlog[site].append(os.path.basename(key)) + elif file_to_transfer.get('filetype') == 'basebackup': + new_basebackup = list(storage.iter_key(key, include_key=True))[0].value + # patch metadata + self.pghoard.patch_basebackup_info(entry=new_basebackup, + site_config=self.pghoard.config["backup_sites"][site]) + self.pghoard.remote_basebackup[site].append(new_basebackup) + return {"success": True, "opaque": file_to_transfer.get("opaque")} except Exception as ex: # pylint: disable=broad-except if file_to_transfer.get("retry_number", 0) > 0: diff --git a/test/base.py b/test/base.py index 71f1f32c..453bfb22 100644 --- a/test/base.py +++ b/test/base.py @@ -6,7 +6,7 @@ """ # pylint: disable=attribute-defined-outside-init from pghoard.config import find_pg_binary, set_and_check_config_defaults -from pghoard.rohmu import compat +from pghoard.rohmu import compat, dates from shutil import rmtree from tempfile import mkdtemp import logging @@ -96,6 +96,16 @@ def config_template(self, override=None): def setup_method(self, method): self.temp_dir = mkdtemp(prefix=self.__class__.__name__) self.test_site = "site_{}".format(method.__name__) + self.remote_xlog = {} + self.remote_basebackup = {} + self.remote_xlog[self.test_site] = [] + self.remote_basebackup[self.test_site] = [] def teardown_method(self, method): # pylint: disable=unused-argument rmtree(self.temp_dir) + + def patch_basebackup_info(self, *, entry, site_config): # pylint: disable=unused-argument + # drop path from resulting list and convert timestamps + entry["name"] = os.path.basename(entry["name"]) + metadata = entry["metadata"] + metadata["start-time"] = dates.parse_timestamp(metadata["start-time"]) diff --git a/test/conftest.py b/test/conftest.py index 2c2ae616..316473ab 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -274,6 +274,12 @@ def pghoard_base(db, tmpdir, request, compression="snappy", # pylint: disable= pgh = PGHoard(confpath) pgh.test_site = test_site + pgh.remote_xlog = {} + pgh.remote_basebackup = {} + pgh.remote_xlog[pgh.test_site] = [] + pgh.remote_basebackup[pgh.test_site] = [] + pgh.set_state_defaults(pgh.test_site) + pgh.state["backup_sites"][pgh.test_site]["basebackups"] = pgh.remote_basebackup[pgh.test_site] pgh.start_threads_on_startup() if compression == "snappy": pgh.Compressor = snappy.StreamCompressor diff --git a/test/test_basebackup.py b/test/test_basebackup.py index 02ce6f6f..068ef09f 100644 --- a/test/test_basebackup.py +++ b/test/test_basebackup.py @@ -557,7 +557,7 @@ def test_handle_site(self, pghoard): # now call handle_site so it notices the backup has finished (this must not start a new one) pghoard.handle_site(pghoard.test_site, site_config) assert pghoard.test_site not in pghoard.basebackups - first_basebackups = pghoard.state["backup_sites"][pghoard.test_site]["basebackups"] + first_basebackups = pghoard.remote_basebackup[pghoard.test_site][:] assert first_basebackups[0]["metadata"]["backup-reason"] == "scheduled" assert first_basebackups[0]["metadata"]["backup-decision-time"] assert first_basebackups[0]["metadata"]["normalized-backup-time"] is None @@ -570,7 +570,7 @@ def test_handle_site(self, pghoard): pghoard.handle_site(pghoard.test_site, site_config) assert pghoard.test_site not in pghoard.basebackups - second_basebackups = pghoard.state["backup_sites"][pghoard.test_site]["basebackups"] + second_basebackups = pghoard.remote_basebackup[pghoard.test_site][:] second_time_of_check = pghoard.time_of_last_backup_check[pghoard.test_site] assert second_basebackups == first_basebackups assert second_time_of_check > first_time_of_check @@ -584,7 +584,7 @@ def test_handle_site(self, pghoard): pghoard.handle_site(pghoard.test_site, site_config) assert pghoard.test_site not in pghoard.basebackups - third_basebackups = pghoard.state["backup_sites"][pghoard.test_site]["basebackups"] + third_basebackups = pghoard.remote_basebackup[pghoard.test_site][:] third_time_of_check = pghoard.time_of_last_backup_check[pghoard.test_site] assert third_basebackups != second_basebackups assert third_time_of_check > second_time_of_check @@ -594,7 +594,7 @@ def test_handle_site(self, pghoard): pghoard.handle_site(pghoard.test_site, site_config) assert pghoard.test_site not in pghoard.basebackups - fourth_basebackups = pghoard.state["backup_sites"][pghoard.test_site]["basebackups"] + fourth_basebackups = pghoard.remote_basebackup[pghoard.test_site][:] fourth_time_of_check = pghoard.time_of_last_backup_check[pghoard.test_site] assert fourth_basebackups == third_basebackups assert fourth_time_of_check == third_time_of_check diff --git a/test/test_pghoard.py b/test/test_pghoard.py index 1d736f49..4611b167 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -109,70 +109,64 @@ def test_determine_backups_to_delete(self): now = datetime.datetime.now(datetime.timezone.utc) bbs = [ {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=10, hours=4)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=9, hours=4)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=9, hours=1)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=8, hours=4)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=7, hours=4)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=6, hours=4)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=6, hours=20)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=5, hours=4)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=4, hours=4)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=3, hours=4)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=2, hours=4)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(days=1, hours=4)}}, - {"name": "bb1", "metadata": {"start-time": now - datetime.timedelta(hours=4)}}, + {"name": "bb2", "metadata": {"start-time": now - datetime.timedelta(days=9, hours=4)}}, + {"name": "bb3", "metadata": {"start-time": now - datetime.timedelta(days=9, hours=1)}}, + {"name": "bb4", "metadata": {"start-time": now - datetime.timedelta(days=8, hours=4)}}, + {"name": "bb5", "metadata": {"start-time": now - datetime.timedelta(days=7, hours=4)}}, + {"name": "bb6", "metadata": {"start-time": now - datetime.timedelta(days=6, hours=4)}}, + {"name": "bb7", "metadata": {"start-time": now - datetime.timedelta(days=6, hours=20)}}, + {"name": "bb8", "metadata": {"start-time": now - datetime.timedelta(days=5, hours=4)}}, + {"name": "bb9", "metadata": {"start-time": now - datetime.timedelta(days=4, hours=4)}}, + {"name": "bb10", "metadata": {"start-time": now - datetime.timedelta(days=3, hours=4)}}, + {"name": "bb11", "metadata": {"start-time": now - datetime.timedelta(days=2, hours=4)}}, + {"name": "bb12", "metadata": {"start-time": now - datetime.timedelta(days=1, hours=4)}}, + {"name": "bb13", "metadata": {"start-time": now - datetime.timedelta(hours=4)}}, ] + basebackup_count = 4 site_config = { - "basebackup_count": 4, + "basebackup_count": basebackup_count, "basebackup_count_min": 2, "basebackup_interval_hours": 24, } - bbs_copy = list(bbs) - to_delete = self.pghoard.determine_backups_to_delete(basebackups=bbs_copy, site_config=site_config) - assert len(bbs_copy) == 4 - assert len(to_delete) == len(bbs) - len(bbs_copy) + self.pghoard.config["backup_sites"][self.test_site] = site_config + self.pghoard.remote_basebackup[self.test_site] = bbs + to_delete = self.pghoard.determine_backups_to_delete(self.test_site) + assert len(bbs) - len(to_delete) == basebackup_count + # check that pghoard delete oldest basebackups (first items of the list) assert to_delete == bbs[:len(to_delete)] - assert bbs_copy == bbs[len(to_delete):] site_config["basebackup_count"] = 16 site_config["basebackup_age_days_max"] = 8 - bbs_copy = list(bbs) - to_delete = self.pghoard.determine_backups_to_delete(basebackups=bbs_copy, site_config=site_config) + to_delete = self.pghoard.determine_backups_to_delete(self.test_site) # 3 of the backups are too old (start time + interval is over 8 days in the past) - assert len(bbs_copy) == 10 - assert len(to_delete) == len(bbs) - len(bbs_copy) + assert len(to_delete) == 3 assert to_delete == bbs[:len(to_delete)] - assert bbs_copy == bbs[len(to_delete):] site_config["basebackup_count"] = 9 - bbs_copy = list(bbs) - to_delete = self.pghoard.determine_backups_to_delete(basebackups=bbs_copy, site_config=site_config) + site_config["basebackup_age_days_max"] = 10 + to_delete = self.pghoard.determine_backups_to_delete(self.test_site) # basebackup_count trumps backup age and backups are removed even though they're not too old - assert len(bbs_copy) == 9 - assert len(to_delete) == len(bbs) - len(bbs_copy) + # We have 13 basebackups, 12 with start-time < 10 days + # So based with basebackup_count = 9 pghoard should delete 4 backups (bb1, bb2, bb3, bb4) + # And with basebackup_age_days_max = 10 days pghoard should delete 1 backup (bb1) + assert len(to_delete) == 4 assert to_delete == bbs[:len(to_delete)] - assert bbs_copy == bbs[len(to_delete):] + basebackup_count_min = 6 site_config["basebackup_count"] = 16 site_config["basebackup_age_days_max"] = 2 - site_config["basebackup_count_min"] = 6 - bbs_copy = list(bbs) - to_delete = self.pghoard.determine_backups_to_delete(basebackups=bbs_copy, site_config=site_config) + site_config["basebackup_count_min"] = basebackup_count_min + to_delete = self.pghoard.determine_backups_to_delete(self.test_site) # basebackup_count_min ensures not that many backups are removed even though they're too old - assert len(bbs_copy) == 6 - assert len(to_delete) == len(bbs) - len(bbs_copy) + assert len(to_delete) == len(self.pghoard.remote_basebackup[self.test_site]) - basebackup_count_min assert to_delete == bbs[:len(to_delete)] - assert bbs_copy == bbs[len(to_delete):] site_config["basebackup_count_min"] = 2 - bbs_copy = list(bbs) - to_delete = self.pghoard.determine_backups_to_delete(basebackups=bbs_copy, site_config=site_config) + to_delete = self.pghoard.determine_backups_to_delete(self.test_site) # 3 of the backups are new enough (start time less than 3 days in the past) - assert len(bbs_copy) == 3 - assert len(to_delete) == len(bbs) - len(bbs_copy) + assert len(to_delete) == len(self.pghoard.remote_basebackup[self.test_site]) - 3 assert to_delete == bbs[:len(to_delete)] - assert bbs_copy == bbs[len(to_delete):] def test_local_refresh_backup_list_and_delete_old(self): basebackup_storage_path = os.path.join(self.local_storage_dir, "basebackup") @@ -181,7 +175,10 @@ def test_local_refresh_backup_list_and_delete_old(self): os.makedirs(wal_storage_path) self.pghoard.set_state_defaults(self.test_site) - assert self.pghoard.get_remote_basebackups_info(self.test_site) == [] + self.pghoard.remote_basebackup[self.test_site] = self.pghoard.get_remote_basebackups_info(self.test_site) + self.pghoard.remote_xlog[self.test_site] = self.pghoard.get_remote_xlogs_info(self.test_site) + assert self.pghoard.remote_basebackup[self.test_site] == [] + assert self.pghoard.remote_xlog[self.test_site] == [] def write_backup_and_wal_files(what): for bb, wals in what.items(): @@ -197,8 +194,11 @@ def write_backup_and_wal_files(what): "start-time": start_time.isoformat(), }, fp) for wal in wals: - with open(os.path.join(wal_storage_path, wal), "wb") as fp: + wal_path = os.path.join(wal_storage_path, wal) + with open(wal_path, "wb") as fp: fp.write(b"something") + with open(wal_path + ".metadata", "w") as fp: + json.dump({}, fp) backups_and_wals = { "2015-08-25_0": [ @@ -222,12 +222,15 @@ def write_backup_and_wal_files(what): ], } write_backup_and_wal_files(backups_and_wals) - basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) - assert len(basebackups) == 4 + self.pghoard.remote_basebackup[self.test_site] = self.pghoard.get_remote_basebackups_info(self.test_site) + self.pghoard.remote_xlog[self.test_site] = self.pghoard.get_remote_xlogs_info(self.test_site) + assert len(self.pghoard.remote_basebackup[self.test_site]) == 4 + assert len(self.pghoard.remote_xlog[self.test_site]) == 9 self.pghoard.refresh_backup_list_and_delete_old(self.test_site) - basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) - assert len(basebackups) == 1 - assert len(os.listdir(wal_storage_path)) == 3 + self.pghoard.remote_basebackup[self.test_site] = self.pghoard.get_remote_basebackups_info(self.test_site) + assert len(self.pghoard.remote_basebackup[self.test_site]) == 1 + assert len(self.pghoard.remote_xlog[self.test_site]) == 3 + assert len(os.listdir(wal_storage_path)) == 2 * len(self.pghoard.remote_xlog[self.test_site]) # Put all WAL segments between 1 and 9 in place to see that they're deleted and we don't try to go back # any further from TLI 1. Note that timeline 3 is now "empty" so deletion shouldn't touch timelines 2 # or 1. @@ -246,14 +249,17 @@ def write_backup_and_wal_files(what): ], } write_backup_and_wal_files(new_backups_and_wals) - assert len(os.listdir(wal_storage_path)) == 11 + self.pghoard.remote_basebackup[self.test_site] = self.pghoard.get_remote_basebackups_info(self.test_site) + self.pghoard.remote_xlog[self.test_site] = self.pghoard.get_remote_xlogs_info(self.test_site) + assert len(self.pghoard.remote_xlog[self.test_site]) == 11 + assert len(os.listdir(wal_storage_path)) == 2 * len(self.pghoard.remote_xlog[self.test_site]) self.pghoard.refresh_backup_list_and_delete_old(self.test_site) - basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) - assert len(basebackups) == 1 + assert len(self.pghoard.remote_basebackup[self.test_site]) == 1 expected_wal_count = len(backups_and_wals["2015-08-25_0"]) expected_wal_count += len(new_backups_and_wals[""]) expected_wal_count += len(new_backups_and_wals["2015-08-25_4"]) - assert len(os.listdir(wal_storage_path)) == expected_wal_count + assert len(self.pghoard.remote_xlog[self.test_site]) == expected_wal_count + assert len(os.listdir(wal_storage_path)) == 2 * len(self.pghoard.remote_xlog[self.test_site]) # Now put WAL files in place with no gaps anywhere gapless_backups_and_wals = { "2015-08-25_3": [ @@ -265,11 +271,14 @@ def write_backup_and_wal_files(what): ], } write_backup_and_wal_files(gapless_backups_and_wals) - assert len(os.listdir(wal_storage_path)) >= 10 + self.pghoard.remote_basebackup[self.test_site] = self.pghoard.get_remote_basebackups_info(self.test_site) + self.pghoard.remote_xlog[self.test_site] = self.pghoard.get_remote_xlogs_info(self.test_site) + assert len(self.pghoard.remote_xlog[self.test_site]) >= 10 + assert len(os.listdir(wal_storage_path)) == 2 * len(self.pghoard.remote_xlog[self.test_site]) self.pghoard.refresh_backup_list_and_delete_old(self.test_site) - basebackups = self.pghoard.get_remote_basebackups_info(self.test_site) - assert len(basebackups) == 1 - assert len(os.listdir(wal_storage_path)) == 1 + assert len(self.pghoard.remote_basebackup[self.test_site]) == 1 + assert len(self.pghoard.remote_xlog[self.test_site]) == 1 + assert len(os.listdir(wal_storage_path)) == 2 * len(self.pghoard.remote_xlog[self.test_site]) def test_alert_files(self): alert_file_path = os.path.join(self.config["alert_file_dir"], "test_alert") diff --git a/test/test_transferagent.py b/test/test_transferagent.py index 695845f8..c20c834c 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -5,6 +5,13 @@ See LICENSE for details """ # pylint: disable=attribute-defined-outside-init +import datetime +import hashlib + +import dateutil + +from pghoard.rohmu.object_storage.base import IterKeyItem, KEY_TYPE_OBJECT + from .base import PGHoardTestCase from pghoard import metrics from pghoard.rohmu.errors import FileNotFoundFromStorageError, StorageError @@ -17,11 +24,39 @@ class MockStorage(Mock): + + def init(self): + if self.init_ok is not True: # pylint: disable=access-member-before-definition + self.objects = {} + now = datetime.datetime.now(dateutil.tz.tzlocal()) + self.sample_storage_date = "{} {}".format(now.isoformat().split("+", 1)[0], now.tzname()) + self.init_ok = True + + def setup_method(self): + self.init() + def get_contents_to_string(self, key): # pylint: disable=unused-argument + self.init() return b"joo", {"key": "value"} def store_file_from_disk(self, key, local_path, metadata, multipart=None): # pylint: disable=unused-argument - pass + self.init() + self.objects[key] = local_path + + def iter_key(self, key, *, with_metadata=True, deep=False, include_key=False): # pylint: disable=unused-argument + self.init() + for item in self.objects: + if key == item[0:len(key)]: + yield IterKeyItem( + type=KEY_TYPE_OBJECT, + value={ + "last_modified": self.sample_storage_date, + "md5": hashlib.md5(item.encode()).hexdigest(), + "metadata": {"start-time": self.sample_storage_date}, + "name": item, + "size": len(self.objects[item]), + }, + ) class MockStorageRaising(Mock): @@ -64,7 +99,8 @@ def setup_method(self, method): mp_manager=None, transfer_queue=self.transfer_queue, metrics=metrics.Metrics(statsd={}), - shared_state_dict={}) + shared_state_dict={}, + pghoard=self) self.transfer_agent.start() def teardown_method(self, method):