From 329afa5486592331d3575fa64aa02c1123a46216 Mon Sep 17 00:00:00 2001 From: Sijie Date: Tue, 28 Nov 2023 15:13:15 -0800 Subject: [PATCH 1/3] rename and relocate callback server wrapper --- .../callback_server_wrapper.py} | 109 +++++++++++------- src/loaders/common/loader_common_names.py | 3 + .../workspace_downloader.py | 33 +++++- .../workspace_uploader/workspace_uploader.py | 2 +- 4 files changed, 102 insertions(+), 45 deletions(-) rename src/loaders/{workspace_downloader/workspace_downloader_helper.py => common/callback_server_wrapper.py} (63%) diff --git a/src/loaders/workspace_downloader/workspace_downloader_helper.py b/src/loaders/common/callback_server_wrapper.py similarity index 63% rename from src/loaders/workspace_downloader/workspace_downloader_helper.py rename to src/loaders/common/callback_server_wrapper.py index 3ba1663a2..2c1dffa9d 100644 --- a/src/loaders/workspace_downloader/workspace_downloader_helper.py +++ b/src/loaders/common/callback_server_wrapper.py @@ -17,16 +17,20 @@ class Conf: """ Configuration class for the workspace downloader and workspace uploader scripts. """ + def __init__( - self, - job_dir: str, - output_dir: str, - worker_function: Callable, - kb_base_url: str = "https://ci.kbase.us/services/", - token_filepath: str | None = None, - workers: int = 5, - retrieve_sample: bool = False, - ignore_no_sample_error: bool = False, + self, + job_dir: str, + output_dir: str, + kb_base_url: str = "https://ci.kbase.us/services/", + token_filepath: str | None = None, + au_service_ver: str = "release", + workers: int = 5, + max_task: int = 20, + worker_function: Callable | None = None, + retrieve_sample: bool = False, + ignore_no_sample_error: bool = False, + workspace_downloader: bool = True, ) -> None: """ Initialize the configuration class. @@ -34,19 +38,26 @@ def __init__( Args: job_dir (str): The directory for SDK jobs per user. output_dir (str): The directory for a specific workspace id under sourcedata/ws. - worker_function (Callable): The function that will be called by the workers. kb_base_url (str): The base url of the KBase services. - token_filepath (str): The file path that stores a KBase token appropriate for the KBase environment. + token_filepath (str): The file path that stores a KBase token appropriate for the KBase environment. If not supplied, the token must be provided in the environment variable KB_AUTH_TOKEN. + au_service_ver (str): The service verison of AssemblyUtilClient. workers (int): The number of workers to use for multiprocessing. + max_task (int): The maxmium subtasks for the callback server. + worker_function (Callable): The function that will be called by the workers. retrieve_sample (bool): Whether to retrieve sample for each genome object. ignore_no_sample_error (bool): Whether to ignore the error when no sample data is found. + workspace_downloader (bool): Whether to be used for the workspace downloader script. """ + # common instance variables + ipv4 = loader_helper.get_ip() port = loader_helper.find_free_port() token = loader_helper.get_token(token_filepath) - self.retrieve_sample = retrieve_sample - self.ignore_no_sample_error = ignore_no_sample_error - ipv4 = loader_helper.get_ip() + + ws_url = os.path.join(kb_base_url, "ws") + callback_url = "http://" + ipv4 + ":" + str(port) + print("callback_url:", callback_url) + self._start_callback_server( docker.from_env(), uuid.uuid4().hex, @@ -54,32 +65,40 @@ def __init__( kb_base_url, token, port, + max_task, ipv4, ) - ws_url = os.path.join(kb_base_url, "ws") - sample_url = os.path.join(kb_base_url, "sampleservice") - callback_url = "http://" + ipv4 + ":" + str(port) - print("callback_url:", callback_url) - self.ws = Workspace(ws_url, token=token) - self.asu = AssemblyUtil(callback_url, token=token) - self.ss = SampleService(sample_url, token=token) + self.asu = AssemblyUtil(callback_url, service_ver=au_service_ver, token=token) - self.workers = workers self.output_dir = output_dir - self.input_queue = Queue() - self.output_queue = Queue() self.job_data_dir = loader_helper.make_job_data_dir(job_dir) - self.pools = Pool(workers, worker_function, [self]) + + self.logging = None + + # unique to downloader + if workspace_downloader: + if worker_function is None: + raise ValueError( + "worker_function cannot be None for the workspace downloader script" + ) + self.input_queue = Queue() + self.pools = Pool(workers, worker_function, [self]) + self.retrieve_sample = retrieve_sample + self.ignore_no_sample_error = ignore_no_sample_error + + sample_url = os.path.join(kb_base_url, "sampleservice") + self.ss = SampleService(sample_url, token=token) def _setup_callback_server_envs( - self, - job_dir: str, - kb_base_url: str, - token: str, - port: int, - ipv4: str, + self, + job_dir: str, + kb_base_url: str, + token: str, + port: int, + max_task: int, + ipv4: str, ) -> Tuple[dict[str, Union[int, str]], dict[str, dict[str, str]]]: """ Setup the environment variables and volumes for the callback server. @@ -89,6 +108,8 @@ def _setup_callback_server_envs( kb_base_url (str): The base url of the KBase services. token (str): The KBase token. port (int): The port number for the callback server. + max_task (int): The maxmium subtasks for the callback server. + ipv4: (str): The ipv4 address for the callback server. Returns: tuple: A tuple of the environment variables and volumes for the callback server. @@ -99,9 +120,11 @@ def _setup_callback_server_envs( # used by the callback server env["KB_AUTH_TOKEN"] = token + env["KB_ADMIN_AUTH_TOKEN"] = token # pass in admin_token to get catalog params env["KB_BASE_URL"] = kb_base_url env["JOB_DIR"] = job_dir env["CALLBACK_PORT"] = port + env["JR_MAX_TASKS"] = max_task env["CALLBACK_IP"] = ipv4 # specify an ipv4 address for the callback server # otherwise, the callback container will use the an ipv6 address @@ -116,14 +139,15 @@ def _setup_callback_server_envs( return env, vol def _start_callback_server( - self, - client: docker.client, - container_name: str, - job_dir: str, - kb_base_url: str, - token: str, - port: int, - ipv4: str, + self, + client: docker.client, + container_name: str, + job_dir: str, + kb_base_url: str, + token: str, + port: int, + max_task: int, + ipv4: str, ) -> None: """ Start the callback server. @@ -134,9 +158,13 @@ def _start_callback_server( job_dir (str): The directory for SDK jobs per user. kb_base_url (str): The base url of the KBase services. token (str): The KBase token. + max_task (int): The maxmium subtasks for the callback server. port (int): The port number for the callback server. + ipv4: (str): The ipv4 address for the callback server. """ - env, vol = self._setup_callback_server_envs(job_dir, kb_base_url, token, port, ipv4) + env, vol = self._setup_callback_server_envs( + job_dir, kb_base_url, token, port, max_task, ipv4 + ) self.container = client.containers.run( name=container_name, image=CALLBACK_IMAGE_NAME, @@ -151,5 +179,6 @@ def stop_callback_server(self) -> None: """ Stop the callback server. """ + self.logging = self.container.logs() self.container.stop() self.container.remove() diff --git a/src/loaders/common/loader_common_names.py b/src/loaders/common/loader_common_names.py index a8f26555c..1481dac8f 100644 --- a/src/loaders/common/loader_common_names.py +++ b/src/loaders/common/loader_common_names.py @@ -128,3 +128,6 @@ 'NEXT': 'https://next.kbase.us/services/', 'APPDEV': 'https://appdev.kbase.us/services/', 'PROD': 'https://kbase.us/services/'} + +# service_vers +SERVICE_VERSIONS = ["dev", "beta", "release"] \ No newline at end of file diff --git a/src/loaders/workspace_downloader/workspace_downloader.py b/src/loaders/workspace_downloader/workspace_downloader.py index 61e941207..2a0a34629 100644 --- a/src/loaders/workspace_downloader/workspace_downloader.py +++ b/src/loaders/workspace_downloader/workspace_downloader.py @@ -1,6 +1,8 @@ """ -usage: workspace_downloader.py [-h] --workspace_id WORKSPACE_ID [--kbase_collection KBASE_COLLECTION] [--source_version SOURCE_VERSION] [--root_dir ROOT_DIR] - [--env {CI,NEXT,APPDEV,PROD}] [--workers WORKERS] [--token_filepath TOKEN_FILEPATH] [--keep_job_dir] [--retrieve_sample] [--ignore_no_sample_error] +usage: workspace_downloader.py [-h] --workspace_id WORKSPACE_ID [--kbase_collection KBASE_COLLECTION] [--source_version SOURCE_VERSION] + [--root_dir ROOT_DIR] [--env {CI,NEXT,APPDEV,PROD}] [--workers WORKERS] [--token_filepath TOKEN_FILEPATH] + [--jr_max_tasks JR_MAX_TASKS] [--au_service_ver {dev,beta,release}] [--keep_job_dir] [--retrieve_sample] + [--ignore_no_sample_error] PROTOTYPE - Download genome files from the workspace service (WSS). @@ -22,6 +24,10 @@ --workers WORKERS Number of workers for multiprocessing (default: 5) --token_filepath TOKEN_FILEPATH A file path that stores KBase token + --jr_max_tasks JR_MAX_TASKS + The maxmium subtasks for the callback server (default: 20) + --au_service_ver {dev,beta,release} + The service version of AssemblyUtil client (default: release) --keep_job_dir Keep SDK job directory after download task is completed --retrieve_sample Retrieve sample for each genome object --ignore_no_sample_error @@ -55,7 +61,7 @@ import src.common.storage.collection_and_field_names as names from src.loaders.common import loader_common_names, loader_helper -from src.loaders.workspace_downloader.workspace_downloader_helper import Conf +from src.loaders.common.callback_server_wrapper import Conf # setup KB_AUTH_TOKEN as env or provide a token_filepath in --token_filepath # export KB_AUTH_TOKEN="your-kb-auth-token" @@ -373,6 +379,19 @@ def main(): type=str, help="A file path that stores KBase token", ) + optional.add_argument( + "--jr_max_tasks", + type=int, + default=20, + help="The maxmium subtasks for the callback server", + ) + optional.add_argument( + "--au_service_ver", + type=str, + choices=loader_common_names.SERVICE_VERSIONS, + default="release", + help="The service version of AssemblyUtil client", + ) optional.add_argument( "--keep_job_dir", action="store_true", @@ -398,6 +417,8 @@ def main(): env = args.env workers = args.workers token_filepath = args.token_filepath + max_task = args.jr_max_tasks + au_service_ver = args.au_service_ver keep_job_dir = args.keep_job_dir retrieve_sample = args.retrieve_sample ignore_no_sample_error = args.ignore_no_sample_error @@ -410,6 +431,8 @@ def main(): ) if workspace_id <= 0: parser.error(f"workspace_id needs to be > 0") + if max_task <= 0: + parser.error(f"jr_max_tasks needs to be > 0") if workers < 1 or workers > cpu_count(): parser.error(f"minimum worker is 1 and maximum worker is {cpu_count()}") @@ -438,10 +461,12 @@ def main(): conf = Conf( job_dir, output_dir, - _process_input, kb_base_url, token_filepath, + au_service_ver, workers, + max_task, + _process_input, retrieve_sample, ignore_no_sample_error, ) diff --git a/src/loaders/workspace_uploader/workspace_uploader.py b/src/loaders/workspace_uploader/workspace_uploader.py index 3a781bd8d..b76b84b61 100644 --- a/src/loaders/workspace_uploader/workspace_uploader.py +++ b/src/loaders/workspace_uploader/workspace_uploader.py @@ -59,7 +59,7 @@ import yaml from src.loaders.common import loader_common_names, loader_helper -from src.loaders.workspace_downloader.workspace_downloader_helper import Conf +from src.loaders.common.callback_server_wrapper import Conf # setup KB_AUTH_TOKEN as env or provide a token_filepath in --token_filepath # export KB_AUTH_TOKEN="your-kb-auth-token" From 6ca54bd495c39c33055a3077cf82dd78121f819e Mon Sep 17 00:00:00 2001 From: Sijie Date: Tue, 28 Nov 2023 15:49:44 -0800 Subject: [PATCH 2/3] test passed on callback server wrapper --- src/loaders/common/callback_server_wrapper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/loaders/common/callback_server_wrapper.py b/src/loaders/common/callback_server_wrapper.py index 2c1dffa9d..615c1bbe9 100644 --- a/src/loaders/common/callback_server_wrapper.py +++ b/src/loaders/common/callback_server_wrapper.py @@ -84,13 +84,14 @@ def __init__( "worker_function cannot be None for the workspace downloader script" ) self.input_queue = Queue() - self.pools = Pool(workers, worker_function, [self]) self.retrieve_sample = retrieve_sample self.ignore_no_sample_error = ignore_no_sample_error sample_url = os.path.join(kb_base_url, "sampleservice") self.ss = SampleService(sample_url, token=token) + self.pools = Pool(workers, worker_function, [self]) + def _setup_callback_server_envs( self, job_dir: str, From 1a8fc2cb8088c6b3f29f7c238087fc3a05091e64 Mon Sep 17 00:00:00 2001 From: Sijie Date: Tue, 28 Nov 2023 16:05:01 -0800 Subject: [PATCH 3/3] update import --- test/src/loaders/workspace_uploader/workspace_uploader_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/src/loaders/workspace_uploader/workspace_uploader_test.py b/test/src/loaders/workspace_uploader/workspace_uploader_test.py index 58ac3fda6..f63452d09 100644 --- a/test/src/loaders/workspace_uploader/workspace_uploader_test.py +++ b/test/src/loaders/workspace_uploader/workspace_uploader_test.py @@ -10,7 +10,7 @@ from src.clients.AssemblyUtilClient import AssemblyUtil from src.loaders.common import loader_helper -from src.loaders.workspace_downloader.workspace_downloader_helper import Conf +from src.loaders.common.callback_server_wrapper import Conf from src.loaders.workspace_uploader import workspace_uploader ASSEMBLY_DIR_NAMES = ["GCF_000979855.1", "GCF_000979175.1"]