From 2a4ddfa00e9bd4f1760340d15da46d23d96276f6 Mon Sep 17 00:00:00 2001 From: RachelTucker Date: Wed, 2 Nov 2022 15:54:16 -0600 Subject: [PATCH] Waiting for multithreaded futures to finish and adding basic logging to helper functions. (#56) --- ds3/ds3Helpers.py | 49 ++++++++++++++++++++++++++++++++++++++++--- tests/helpersTests.py | 5 +++-- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/ds3/ds3Helpers.py b/ds3/ds3Helpers.py index fd1023a..27b5510 100644 --- a/ds3/ds3Helpers.py +++ b/ds3/ds3Helpers.py @@ -10,6 +10,7 @@ # specific language governing permissions and limitations under the License. import concurrent.futures import hashlib +import logging import time import zlib from os import walk, path @@ -124,6 +125,14 @@ def calculate_checksum_header(object_data_stream, checksum_type: str, length: in return {header_key: encoded_checksum} +def done_callback(future: concurrent.futures.Future): + try: + result = future.result() + logging.info(f'Finished transferring blob name={result[0]}, offset={result[1]}') + except Exception as ex: + logging.error(f'{ex}') + + class Helper(object): """A class that moves data to and from a Black Pearl""" @@ -186,6 +195,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa PutBulkJobSpectraS3Request(bucket_name=bucket, object_list=ds3_put_objects, name=job_name)) job_id = bulk_put.result['JobId'] + logging.info(f'Created put job {job_id} with {len(put_objects)} objects.') blob_set: Set[Blob] = set() for chunk in bulk_put.result['ObjectsList']: @@ -197,6 +207,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa blob_set.add(cur_blob) # send until all blobs have been transferred + error_count: int = 0 while len(blob_set) > 0: available_chunks = self.client.get_job_chunks_ready_for_client_processing_spectra_s3( GetJobChunksReadyForClientProcessingSpectraS3Request(job_id)) @@ -208,6 +219,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa continue # retrieve all available blobs concurrently + futures: List[concurrent.futures.Future] = list() with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: for chunk in chunks: for blob in chunk['ObjectList']: @@ -220,8 +232,21 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa blob_set.remove(cur_blob) put_object = put_objects_map[cur_blob.name] - executor.submit(self.put_blob, bucket, put_object, cur_blob.length, cur_blob.offset, job_id, - checksum_type) + future = executor.submit(self.put_blob, bucket, put_object, cur_blob.length, + cur_blob.offset, job_id, checksum_type) + future.add_done_callback(done_callback) + futures.append(future) + + # Wait for all futures to finish + concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED) + for future in futures: + if future.exception() is not None: + error_count += 1 + + if error_count > 0: + logging.warning(f'Completed job {job_id} with {error_count} errors.') + else: + logging.info(f'Completed job {job_id} with no errors.') return job_id @@ -244,6 +269,7 @@ def put_blob(self, bucket: str, put_object: HelperPutObject, length: int, offset job=job_id, headers=headers)) stream.close() + return put_object.object_name, offset def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per_bp_job: int = 1000, max_threads: int = 5, calculate_checksum: bool = False, @@ -329,6 +355,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa name=job_name)) job_id = bulk_get.result['JobId'] + logging.info(f'Created get job {job_id} with {len(get_objects)} objects.') blob_set: Set[Blob] = set() for chunk in bulk_get.result['ObjectsList']: @@ -340,6 +367,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa blob_set.add(cur_blob) # retrieve until all blobs have been transferred + error_count: int = 0 while len(blob_set) > 0: available_chunks = self.client.get_job_chunks_ready_for_client_processing_spectra_s3( GetJobChunksReadyForClientProcessingSpectraS3Request(job_id)) @@ -351,6 +379,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa continue # retrieve all available blobs concurrently + futures: List[concurrent.futures.Future] = list() with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: for chunk in chunks: for blob in chunk['ObjectList']: @@ -363,7 +392,20 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa blob_set.remove(cur_blob) get_object = get_objects_map[cur_blob.name] - executor.submit(self.get_blob, bucket, get_object, offset, job_id) + future = executor.submit(self.get_blob, bucket, get_object, offset, job_id) + future.add_done_callback(done_callback) + futures.append(future) + + # Wait for all futures to finish + concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED) + for future in futures: + if future.exception() is not None: + error_count += 1 + + if error_count > 0: + logging.warning(f'Completed job {job_id} with {error_count} errors.') + else: + logging.info(f'Completed job {job_id} with no errors.') return job_id @@ -376,6 +418,7 @@ def get_blob(self, bucket: str, get_object: HelperGetObject, offset: int, job_id job=job_id, version_id=get_object.version_id)) stream.close() + return get_object.object_name, offset def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per_bp_job: int = 1000, max_threads: int = 5, job_name: str = None) -> List[str]: diff --git a/tests/helpersTests.py b/tests/helpersTests.py index 400f87b..c916b58 100644 --- a/tests/helpersTests.py +++ b/tests/helpersTests.py @@ -8,7 +8,7 @@ # This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR # CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. - +import logging import unittest import os import tempfile @@ -21,6 +21,8 @@ import xml.etree.ElementTree as xmlDom +logging.basicConfig(level=logging.INFO) + def create_files_in_directory(directory: str, num_files: int, root_dir: str, include_dirs: bool = True) -> List[ds3Helpers.HelperPutObject]: @@ -253,7 +255,6 @@ def put_all_objects_in_directory_with_checksum(self, checksum_type: str): # fetch existing storage domain storage_domain = client.get_storage_domains_spectra_s3(ds3.GetStorageDomainsSpectraS3Request()) storage_domain_id = storage_domain.result['StorageDomainList'][0]['Id'] - print("test") data_persistence_rule = client.put_data_persistence_rule_spectra_s3( ds3.PutDataPersistenceRuleSpectraS3Request(data_policy_id=data_policy_id, isolation_level='standard',