Skip to content

Commit

Permalink
Waiting for multithreaded futures to finish and adding basic logging …
Browse files Browse the repository at this point in the history
…to helper functions. (#56)
  • Loading branch information
RachelTucker authored Nov 2, 2022
1 parent b7d262d commit 2a4ddfa
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
49 changes: 46 additions & 3 deletions ds3/ds3Helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# specific language governing permissions and limitations under the License.
import concurrent.futures
import hashlib
import logging
import time
import zlib
from os import walk, path
Expand Down Expand Up @@ -124,6 +125,14 @@ def calculate_checksum_header(object_data_stream, checksum_type: str, length: in
return {header_key: encoded_checksum}


def done_callback(future: concurrent.futures.Future):
try:
result = future.result()
logging.info(f'Finished transferring blob name={result[0]}, offset={result[1]}')
except Exception as ex:
logging.error(f'{ex}')


class Helper(object):
"""A class that moves data to and from a Black Pearl"""

Expand Down Expand Up @@ -186,6 +195,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
PutBulkJobSpectraS3Request(bucket_name=bucket, object_list=ds3_put_objects, name=job_name))

job_id = bulk_put.result['JobId']
logging.info(f'Created put job {job_id} with {len(put_objects)} objects.')

blob_set: Set[Blob] = set()
for chunk in bulk_put.result['ObjectsList']:
Expand All @@ -197,6 +207,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
blob_set.add(cur_blob)

# send until all blobs have been transferred
error_count: int = 0
while len(blob_set) > 0:
available_chunks = self.client.get_job_chunks_ready_for_client_processing_spectra_s3(
GetJobChunksReadyForClientProcessingSpectraS3Request(job_id))
Expand All @@ -208,6 +219,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
continue

# retrieve all available blobs concurrently
futures: List[concurrent.futures.Future] = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
for chunk in chunks:
for blob in chunk['ObjectList']:
Expand All @@ -220,8 +232,21 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
blob_set.remove(cur_blob)
put_object = put_objects_map[cur_blob.name]

executor.submit(self.put_blob, bucket, put_object, cur_blob.length, cur_blob.offset, job_id,
checksum_type)
future = executor.submit(self.put_blob, bucket, put_object, cur_blob.length,
cur_blob.offset, job_id, checksum_type)
future.add_done_callback(done_callback)
futures.append(future)

# Wait for all futures to finish
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
for future in futures:
if future.exception() is not None:
error_count += 1

if error_count > 0:
logging.warning(f'Completed job {job_id} with {error_count} errors.')
else:
logging.info(f'Completed job {job_id} with no errors.')

return job_id

Expand All @@ -244,6 +269,7 @@ def put_blob(self, bucket: str, put_object: HelperPutObject, length: int, offset
job=job_id,
headers=headers))
stream.close()
return put_object.object_name, offset

def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per_bp_job: int = 1000,
max_threads: int = 5, calculate_checksum: bool = False,
Expand Down Expand Up @@ -329,6 +355,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
name=job_name))

job_id = bulk_get.result['JobId']
logging.info(f'Created get job {job_id} with {len(get_objects)} objects.')

blob_set: Set[Blob] = set()
for chunk in bulk_get.result['ObjectsList']:
Expand All @@ -340,6 +367,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
blob_set.add(cur_blob)

# retrieve until all blobs have been transferred
error_count: int = 0
while len(blob_set) > 0:
available_chunks = self.client.get_job_chunks_ready_for_client_processing_spectra_s3(
GetJobChunksReadyForClientProcessingSpectraS3Request(job_id))
Expand All @@ -351,6 +379,7 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
continue

# retrieve all available blobs concurrently
futures: List[concurrent.futures.Future] = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
for chunk in chunks:
for blob in chunk['ObjectList']:
Expand All @@ -363,7 +392,20 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
blob_set.remove(cur_blob)
get_object = get_objects_map[cur_blob.name]

executor.submit(self.get_blob, bucket, get_object, offset, job_id)
future = executor.submit(self.get_blob, bucket, get_object, offset, job_id)
future.add_done_callback(done_callback)
futures.append(future)

# Wait for all futures to finish
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
for future in futures:
if future.exception() is not None:
error_count += 1

if error_count > 0:
logging.warning(f'Completed job {job_id} with {error_count} errors.')
else:
logging.info(f'Completed job {job_id} with no errors.')

return job_id

Expand All @@ -376,6 +418,7 @@ def get_blob(self, bucket: str, get_object: HelperGetObject, offset: int, job_id
job=job_id,
version_id=get_object.version_id))
stream.close()
return get_object.object_name, offset

def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per_bp_job: int = 1000,
max_threads: int = 5, job_name: str = None) -> List[str]:
Expand Down
5 changes: 3 additions & 2 deletions tests/helpersTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

import logging
import unittest
import os
import tempfile
Expand All @@ -21,6 +21,8 @@

import xml.etree.ElementTree as xmlDom

logging.basicConfig(level=logging.INFO)


def create_files_in_directory(directory: str, num_files: int, root_dir: str,
include_dirs: bool = True) -> List[ds3Helpers.HelperPutObject]:
Expand Down Expand Up @@ -253,7 +255,6 @@ def put_all_objects_in_directory_with_checksum(self, checksum_type: str):
# fetch existing storage domain
storage_domain = client.get_storage_domains_spectra_s3(ds3.GetStorageDomainsSpectraS3Request())
storage_domain_id = storage_domain.result['StorageDomainList'][0]['Id']
print("test")

data_persistence_rule = client.put_data_persistence_rule_spectra_s3(
ds3.PutDataPersistenceRuleSpectraS3Request(data_policy_id=data_policy_id, isolation_level='standard',
Expand Down

0 comments on commit 2a4ddfa

Please sign in to comment.