Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RE2022-217: Workspace uploader performance #426

Merged
merged 42 commits into from
Feb 1, 2024
Merged
Changes from 3 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d215fd7
add dynamic batch_size upload
Xiangs18 Aug 22, 2023
1524c47
add comments for iterator and fix a bug
Xiangs18 Aug 22, 2023
f1ac0b4
finish clean up && testing
Xiangs18 Aug 23, 2023
48df915
correct tuple type hint
Xiangs18 Aug 23, 2023
f0fdadf
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Aug 23, 2023
66bb799
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Oct 11, 2023
0f07b4a
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Oct 30, 2023
1eea48f
update WS uploader script
Xiangs18 Nov 3, 2023
51a9225
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Nov 3, 2023
8117061
remove workers and run pass all unit tests
Xiangs18 Nov 6, 2023
95d48c4
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Nov 6, 2023
7bf05b8
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Nov 7, 2023
19e52ae
add AU service_ver CLI
Xiangs18 Nov 15, 2023
2383f1b
print out logs and add env max_task
Xiangs18 Nov 20, 2023
d305518
container.conf setup && clean up
Xiangs18 Nov 21, 2023
20673cd
update setup_callback_server_logs fun
Xiangs18 Nov 21, 2023
dc20206
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Nov 21, 2023
5972e2d
fix catalog param
Xiangs18 Nov 22, 2023
d71c661
update _upload_assembly_files_in_parallel logic && logs permission
Xiangs18 Nov 28, 2023
6ae5f4d
test passed new logic
Xiangs18 Nov 28, 2023
fc9f482
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Nov 29, 2023
f515968
update callback server related files
Xiangs18 Nov 29, 2023
370b154
update config, admin_token, keynames, logs, aus_version
Xiangs18 Dec 1, 2023
ac5aee7
update comments
Xiangs18 Dec 1, 2023
54f89e9
move the upload specific code from Conf into the uploader script
Xiangs18 Dec 4, 2023
9ba9a55
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Dec 4, 2023
97911b2
simplify _gen logic; add descfor cbs wrapper; fix race condition
Xiangs18 Dec 7, 2023
e14b49d
change output_dir to keyword argument
Xiangs18 Dec 7, 2023
2e01b64
1. add load_version; 2. update failure recovery logic; 3.rename CLI a…
Xiangs18 Dec 19, 2023
d0f4d45
fix logic in workspace_uploader.py only
Xiangs18 Jan 19, 2024
744fe56
fix all the tests
Xiangs18 Jan 20, 2024
70def6a
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Jan 20, 2024
c64ce1d
fix format
Xiangs18 Jan 20, 2024
e3a9b4c
fix race condition on containers.conf
Xiangs18 Jan 22, 2024
51d00da
add --as_catalog_admin CLI param
Xiangs18 Jan 22, 2024
1b09e3f
update CLI help message && file lock
Xiangs18 Jan 25, 2024
5caae42
update description && add try finally block for file lock
Xiangs18 Jan 26, 2024
9a91857
fix tests
Xiangs18 Jan 30, 2024
b75e0e1
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Jan 30, 2024
a412797
remove redudant code
Xiangs18 Jan 31, 2024
779ae79
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Jan 31, 2024
c92fe2f
correct typos
Xiangs18 Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 115 additions & 51 deletions src/loaders/workspace_uploader/workspace_uploader.py
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from datetime import datetime
Xiangs18 marked this conversation as resolved.
Show resolved Hide resolved
from multiprocessing import cpu_count
from pathlib import Path
from typing import Tuple
from typing import Generator, Tuple

import yaml

Expand All @@ -67,6 +67,7 @@
UPLOAD_FILE_EXT = ["genomic.fna.gz"] # uplaod only files that match given extensions
JOB_DIR_IN_ASSEMBLYUTIL_CONTAINER = "/kb/module/work/tmp"
UPLOADED_YAML = "uploaded.yaml"
MEMORY_UTILIZATION = 0.01 # approximately 8 assemblies per batch


def _get_parser():
Expand Down Expand Up @@ -160,22 +161,27 @@ def _get_source_file(assembly_dir: str, assembly_file: str) -> str:
return src_file


def _upload_assembly_to_workspace(
conf: Conf,
workspace_id: int,
file_path: str,
assembly_name: str,
) -> str:
"""Upload an assembly file to workspace."""
def _upload_assemblies_to_workspace(
conf: Conf,
workspace_id: int,
file_paths: tuple[str],
assembly_names: tuple[str],
Xiangs18 marked this conversation as resolved.
Show resolved Hide resolved
) -> tuple[str]:
"""
Upload assembly files to the target workspace in batch. The bulk method fails
and an error will be thrown if any of the assembly files in batch fails to upload.
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
"""
inputs = [{"file": f, "assembly_name": n}
for f, n in zip(file_paths, assembly_names)]

success, attempts, max_attempts = False, 0, 3
while attempts < max_attempts and not success:
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
try:
time.sleep(attempts)
assembly_ref = conf.asu.save_assembly_from_fasta2(
assembly_ref = conf.asu.save_assemblies_from_fastas(
{
"file": {"path": file_path},
"workspace_id": workspace_id,
"assembly_name": assembly_name,
"inputs": inputs
}
)
success = True
Expand All @@ -185,11 +191,12 @@ def _upload_assembly_to_workspace(

if not success:
raise ValueError(
f"Upload Failed for {file_path} after {max_attempts} attempts!"
f"Upload Failed for {file_paths} after {max_attempts} attempts!"
)

upa = assembly_ref["upa"].replace("/", "_")
return upa
upas = tuple([result_dict["upa"].replace("/", "_")
for result_dict in assembly_ref["results"]])
return upas


def _read_upload_status_yaml_file(
Expand Down Expand Up @@ -342,40 +349,43 @@ def _process_input(conf: Conf) -> None:
print("Stopping")
break

upa = None
(
upload_env_key,
workspace_id,
container_internal_assembly_path,
host_assembly_dir,
assembly_name,
batch_container_internal_assembly_paths,
batch_host_assembly_dirs,
batch_assembly_names,
upload_dir,
counter,
file_counter,
assembly_files_len,
) = task

batch_upas = (None, ) * len(batch_assembly_names)
try:
upa = _upload_assembly_to_workspace(
conf, workspace_id, container_internal_assembly_path, assembly_name
)
_post_process(
upload_env_key,
workspace_id,
host_assembly_dir,
assembly_name,
upload_dir,
conf.output_dir,
upa
batch_upas = _upload_assemblies_to_workspace(
conf, workspace_id, batch_container_internal_assembly_paths, batch_assembly_names
)
for host_assembly_dir, assembly_name, upa in zip(
batch_host_assembly_dirs, batch_assembly_names, batch_upas
):
_post_process(
upload_env_key,
workspace_id,
host_assembly_dir,
assembly_name,
upload_dir,
conf.output_dir,
upa
)
except Exception as e:
print(f"Failed assembly name: {assembly_name}. Exception:")
print(f"Failed assembly names: {batch_assembly_names}. Exception:")
print(e)

conf.output_queue.put((assembly_name, upa))
conf.output_queue.put((batch_assembly_names, batch_upas))

if counter % 3000 == 0:
print(f"Assemblies processed: {counter}/{assembly_files_len}, "
f"Percentage: {counter / assembly_files_len * 100:.2f}%, "
if file_counter % 3000 == 0:
print(f"Assemblies processed: {file_counter}/{assembly_files_len}, "
f"Percentage: {file_counter / assembly_files_len * 100:.2f}%, "
f"Time: {datetime.now()}")


Expand Down Expand Up @@ -403,38 +413,42 @@ def _upload_assembly_files_in_parallel(
print(f"Start uploading {assembly_files_len} assembly files with {conf.workers} workers\n")

# Put the assembly files in the input_queue
counter = 1
for assembly_name, host_assembly_dir in wait_to_upload_assemblies.items():

container_internal_assembly_path = os.path.join(
JOB_DIR_IN_ASSEMBLYUTIL_CONTAINER, assembly_name
)
file_counter = 0
iter_counter = 0
for (
batch_container_internal_assembly_paths,
batch_host_assembly_dirs,
batch_assembly_names,
batch_size,
) in _gen(conf, wait_to_upload_assemblies):

file_counter += batch_size
conf.input_queue.put(
(
upload_env_key,
workspace_id,
container_internal_assembly_path,
host_assembly_dir,
assembly_name,
batch_container_internal_assembly_paths,
batch_host_assembly_dirs,
batch_assembly_names,
upload_dir,
counter,
file_counter,
assembly_files_len,
)
)

if counter % 5000 == 0:
print(f"Jobs added to the queue: {counter}/{assembly_files_len}, "
f"Percentage: {counter / assembly_files_len * 100:.2f}%, "
if file_counter % 5000 == 0:
print(f"Jobs added to the queue: {file_counter}/{assembly_files_len}, "
f"Percentage: {file_counter / assembly_files_len * 100:.2f}%, "
f"Time: {datetime.now()}")

counter += 1
iter_counter += 1

# Signal the workers to terminate when they finish uploading assembly files
for _ in range(conf.workers):
conf.input_queue.put(None)

results = [conf.output_queue.get() for _ in range(assembly_files_len)]
failed_names = [assembly_name for assembly_name, upa in results if upa is None]
results = [conf.output_queue.get() for _ in range(iter_counter)]
failed_names = [name for names, upas in results for name, upa in zip(names, upas) if upa is None]

# Close and join the processes
conf.pools.close()
Expand All @@ -443,6 +457,56 @@ def _upload_assembly_files_in_parallel(
return failed_names


def _gen(
conf: Conf,
wait_to_upload_assemblies: dict[str, str]
) -> Generator[Tuple[tuple[str], tuple[str], tuple[str], int], None, None]:
Tianhao-Gu marked this conversation as resolved.
Show resolved Hide resolved
"""
Generator function to yield the assembly files to upload.
"""
assembly_files_len = len(wait_to_upload_assemblies)
assembly_names = tuple(wait_to_upload_assemblies.keys())
host_assembly_dirs = tuple(wait_to_upload_assemblies.values())
container_internal_assembly_paths = tuple(
[
os.path.join(JOB_DIR_IN_ASSEMBLYUTIL_CONTAINER, assembly_name)
for assembly_name in assembly_names
]
)

start_idx = 0
cumsize = 0
# to avoid memory issue, upload 1GB * MEMORY_UTILIZATION of assembly files at a time
max_cumsize = 1000000000 * MEMORY_UTILIZATION
Tianhao-Gu marked this conversation as resolved.
Show resolved Hide resolved
for idx in range(assembly_files_len):
Tianhao-Gu marked this conversation as resolved.
Show resolved Hide resolved
file_path = os.path.join(conf.job_data_dir, assembly_names[idx])
if not os.path.exists(file_path):
raise ValueError(f"{file_path} does not exist. "
f"Ensure file {assembly_names[idx]} exist in {conf.job_data_dir}")
file_size = os.path.getsize(file_path)
# cumulate aasembly files until the total size is greater than max_cumsize
if file_size + cumsize <= max_cumsize:
cumsize += file_size
else:
yield (
container_internal_assembly_paths[start_idx:idx],
host_assembly_dirs[start_idx:idx],
assembly_names[start_idx:idx],
idx - start_idx,
)
# reset start_idx and cumsize
start_idx = idx
cumsize = file_size

# yield the remaining assembly files
yield (
Tianhao-Gu marked this conversation as resolved.
Show resolved Hide resolved
container_internal_assembly_paths[start_idx:],
host_assembly_dirs[start_idx:],
assembly_names[start_idx:],
assembly_files_len - start_idx,
)


def main():
parser = _get_parser()
args = parser.parse_args()
Expand Down