Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

22 series.isin #23

Merged
merged 8 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions csd3-side/scripts/lsst-backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ def upload_and_callback(s3_host, access_key, secret_key, bucket_name, folder, fi
logfile.write(f'{result}\n')
return None

# def free_up_zip_memory(to_collate, parent_folder, index):
# del to_collate[parent_folder]['zips'][index]['zip_contents']
# print(f'Deleted zip contents object for {parent_folder}, zip index {index}, to free memory.')

def process_files(s3_host, access_key, secret_key, bucket_name, current_objects, exclude, local_dir, destination_dir, nprocs, perform_checksum, dryrun, log, global_collate, use_compression):
"""
Uploads files from a local directory to an S3 bucket in parallel.
Expand All @@ -447,6 +451,11 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
total_files_uploaded = 0
i = 0
#processed_files = []
if global_collate:
half_cores = nprocs // 2
zip_pool = Pool(processes=half_cores)
collate_ul_pool = Pool(processes=nprocs - half_cores)

pool = Pool(nprocs) # use 4 CPUs by default - very little speed-up, might drop multiprocessing and parallelise at shell level
#recursive loop over local folder
results = []
Expand All @@ -464,13 +473,14 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
elif len(files) == 0:
print(f'Skipping subfolder - no files.')
continue
if folder in exclude:
if exclude.isin([folder]).any():
print(f'Skipping subfolder {folder} - excluded.')
continue
# remove subfolders in exclude list
if len(sub_folders) > 0:
len_pre_exclude = len(sub_folders)
sub_folders[:] = [sub_folder for sub_folder in sub_folders if sub_folder not in exclude]
sub_folders[:] = [sub_folder for sub_folder in sub_folders if not exclude.isin([sub_folder]).any()]
# sub_folders[:] = [sub_folder for sub_folder in sub_folders if sub_folder not in exclude]
print(f'Skipping {len_pre_exclude - len(sub_folders)} subfolders in {folder} - excluded. {len(sub_folders)} subfolders remaining.')

folder_files = [os.sep.join([folder, filename]) for filename in files]
Expand Down Expand Up @@ -653,7 +663,7 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
print(f'collating into: {len(chunks)} zip file(s)')
for id,chunk in enumerate(zip(chunks,chunk_files)):
# print(f'chunk {id} contains {len(chunk[0])} folders')
zip_results.append(pool.apply_async(zip_folders, (parent_folder,chunk[0],chunk_files[0],use_compression,dryrun,id)))
zip_results.append(zip_pool.apply_async(zip_folders, (parent_folder,chunk[0],chunk_files[0],use_compression,dryrun,id)))
zipped = 0
uploaded = []
total_zips = len(zip_results)
Expand All @@ -676,6 +686,8 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
# #[os.sep.join([destination_dir, os.path.relpath(filename, local_dir)]) for filename in folder_files]
# to_collate[parent_folder][id]['zip_object_name'] =

# tc_index = len(to_collate[parent_folder]['zips']) - 1

# check if zip_object_name exists in bucket and get its checksum
if current_objects.isin([to_collate[parent_folder]['zips'][-1]['zip_object_name']]).any():
existing_zip_checksum = bm.get_resource(access_key, secret_key, s3_host).Object(bucket_name,to_collate[parent_folder]['zips'][-1]['zip_object_name']).e_tag.strip('"')
Expand All @@ -690,7 +702,7 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
total_size_uploaded += len(zip_data)
total_files_uploaded += 1
print(f"Uploading {to_collate[parent_folder]['zips'][-1]['zip_object_name']}.")
results.append(pool.apply_async(upload_and_callback, args=(
results.append(collate_ul_pool.apply_async(upload_and_callback, args=(
s3_host,
access_key,
secret_key,
Expand All @@ -706,11 +718,17 @@ def process_files(s3_host, access_key, secret_key, bucket_name, current_objects,
len(zip_data),
total_size_uploaded,
total_files_uploaded,
True,
)))
True),
# callback=lambda _: free_up_zip_memory(to_collate, parent_folder, tc_index),
))

pool.close()
pool.join()
if global_collate:
zip_pool.close()
zip_pool.join()
collate_ul_pool.close()
collate_ul_pool.join()

# # Go!
if __name__ == '__main__':
Expand Down Expand Up @@ -789,15 +807,17 @@ def error(self, message):
dryrun = args.dryrun
use_compression = not args.no_compression # internally, flag turns *on* compression, but for user no-compression turns it off - makes flag more intuitive

exclude = []

if args.exclude:
exclude = args.exclude
exclude = pd.Series(args.exclude)
else:
exclude = pd.Series.empty()

print(f'Config: {args}')

if save_config:
with open(config_file, 'w') as f:
yaml.dump({'bucket_name': bucket_name, 'local_path': local_dir, 'S3_prefix': prefix, 'S3_folder': sub_dirs, 'exclude': exclude, 'nprocs': nprocs, 'no_collate': not global_collate, 'dryrun': dryrun, 'no_checksum': not perform_checksum, 'no_compression': not use_compression}, f)
yaml.dump({'bucket_name': bucket_name, 'local_path': local_dir, 'S3_prefix': prefix, 'S3_folder': sub_dirs, 'exclude': exclude.to_list(), 'nprocs': nprocs, 'no_collate': not global_collate, 'dryrun': dryrun, 'no_checksum': not perform_checksum, 'no_compression': not use_compression}, f)
sys.exit(0)

print(f'Symlinks will be replaced with the target file. A new file <simlink_file>.symlink will contain the symlink target path.')
Expand Down
25 changes: 25 additions & 0 deletions scripts/du.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

# Script to discover the size of datasets using a back-end node

#SBATCH -A IRIS-IP005-CPU
#SBATCH --job-name="volume"
#SBATCH --time=06:00:00
#SBATCH --partition=cclake
#SBATCH --nodes=1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1
#SBATCH --mem=3420mb

# Change to job working directory
# Edit to parent folder of folder(s) of interest
cd /rds/project/rds-lT5YGmtKack/ras81/butler_wide_20220930/data/u/ir-shir1/DRP
date
pwd -P
# uncomment the below for multiple subfolders
du -h --max-depth=1
# uncomment the below for a single named folder
# du -sh folder
date

# for multiple named folders, copy/paste the above
25 changes: 21 additions & 4 deletions scripts/find_collated_zips.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@

import re

def get_zipfile_list(bucket_name, access_key, secret_key, s3_host):
def get_zipfile_list(bucket_name, access_key, secret_key, s3_host, get_contents_metadata):
zipfile_list = []
contents_list = []
s3 = bm.get_resource(access_key, secret_key, s3_host)
s3_client = bm.get_client(access_key, secret_key, s3_host)
bucket = s3.Bucket(bucket_name)
Expand All @@ -47,8 +48,14 @@ def get_zipfile_list(bucket_name, access_key, secret_key, s3_host):
key = obj['Key']
if pattern.match(key):
zipfile_list.append(key)
if get_contents_metadata:
contents = bucket.Object(key).get()['Metadata']['zip-contents'].split(',')
print(f'{key}: {contents}')
else:
print(f'{key}')


return zipfile_list
return zipfile_list, contents

def main():
epilog = ''
Expand All @@ -63,9 +70,14 @@ def error(self, message):
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument('--bucket-name','-b', type=str, help='Name of the S3 bucket.', required=True)
parser.add_argument('--check-contents','-c', action='store_true', help='Check the contents of the zip files from metadata exist in the bucket.')

args = parser.parse_args()
bucket_name = args.bucket_name
if args.check_contents:
check_contents = True
else:
check_contents = False

# Setup bucket object
s3_host = 'echo.stfc.ac.uk'
Expand All @@ -84,9 +96,14 @@ def error(self, message):
print(f'Bucket {bucket_name} not found in {s3_host}.')
sys.exit()

zipfile_list = get_zipfile_list(bucket_name, access_key, secret_key, s3_host)
zipfile_list, zipfile_contents = get_zipfile_list(bucket_name, access_key, secret_key, s3_host, check_contents)

print(zipfile_list)
if check_contents:
for i, contents in enumerate(zipfile_contents):
print(f'Zip file: {zipfile_list[i]}, {contents}')
else:
for zipfile in zipfile_list:
print(zipfile)

if __name__ == '__main__':
main()