Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RE2022-217: Workspace uploader performance #426

Merged
merged 42 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d215fd7
add dynamic batch_size upload
Xiangs18 Aug 22, 2023
1524c47
add comments for iterator and fix a bug
Xiangs18 Aug 22, 2023
f1ac0b4
finish clean up && testing
Xiangs18 Aug 23, 2023
48df915
correct tuple type hint
Xiangs18 Aug 23, 2023
f0fdadf
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Aug 23, 2023
66bb799
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Oct 11, 2023
0f07b4a
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Oct 30, 2023
1eea48f
update WS uploader script
Xiangs18 Nov 3, 2023
51a9225
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Nov 3, 2023
8117061
remove workers and run pass all unit tests
Xiangs18 Nov 6, 2023
95d48c4
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Nov 6, 2023
7bf05b8
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Nov 7, 2023
19e52ae
add AU service_ver CLI
Xiangs18 Nov 15, 2023
2383f1b
print out logs and add env max_task
Xiangs18 Nov 20, 2023
d305518
container.conf setup && clean up
Xiangs18 Nov 21, 2023
20673cd
update setup_callback_server_logs fun
Xiangs18 Nov 21, 2023
dc20206
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Nov 21, 2023
5972e2d
fix catalog param
Xiangs18 Nov 22, 2023
d71c661
update _upload_assembly_files_in_parallel logic && logs permission
Xiangs18 Nov 28, 2023
6ae5f4d
test passed new logic
Xiangs18 Nov 28, 2023
fc9f482
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Nov 29, 2023
f515968
update callback server related files
Xiangs18 Nov 29, 2023
370b154
update config, admin_token, keynames, logs, aus_version
Xiangs18 Dec 1, 2023
ac5aee7
update comments
Xiangs18 Dec 1, 2023
54f89e9
move the upload specific code from Conf into the uploader script
Xiangs18 Dec 4, 2023
9ba9a55
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Dec 4, 2023
97911b2
simplify _gen logic; add descfor cbs wrapper; fix race condition
Xiangs18 Dec 7, 2023
e14b49d
change output_dir to keyword argument
Xiangs18 Dec 7, 2023
2e01b64
1. add load_version; 2. update failure recovery logic; 3.rename CLI a…
Xiangs18 Dec 19, 2023
d0f4d45
fix logic in workspace_uploader.py only
Xiangs18 Jan 19, 2024
744fe56
fix all the tests
Xiangs18 Jan 20, 2024
70def6a
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Jan 20, 2024
c64ce1d
fix format
Xiangs18 Jan 20, 2024
e3a9b4c
fix race condition on containers.conf
Xiangs18 Jan 22, 2024
51d00da
add --as_catalog_admin CLI param
Xiangs18 Jan 22, 2024
1b09e3f
update CLI help message && file lock
Xiangs18 Jan 25, 2024
5caae42
update description && add try finally block for file lock
Xiangs18 Jan 26, 2024
9a91857
fix tests
Xiangs18 Jan 30, 2024
b75e0e1
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Jan 30, 2024
a412797
remove redudant code
Xiangs18 Jan 31, 2024
779ae79
Merge branch 'main' into dv-WS_uploader_performance
Xiangs18 Jan 31, 2024
c92fe2f
correct typos
Xiangs18 Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 123 additions & 44 deletions src/loaders/common/callback_server_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,70 +16,116 @@
class Conf:
"""
Configuration class for the workspace downloader and workspace uploader scripts.
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved

Instance variables:

token - a KBase token appropriate for the KBase environment
callback_url - the url of the callback service to contact
job_data_dir - the directory for SDK jobs per user
input_queue - queue for the workspace downloader tasks
output_dir - the directory for a specific workspace id under sourcedata/ws
retrieve_sample - whether to retrieve sample for each genome object
ignore_no_sample_error - whether to ignore the error when no sample data is found
ws - workspace client
asu - assemblyUtil client
ss - sampleService client
pools - a pool of worker processes

"""
def __init__(
self,
job_dir: str,
output_dir: str,
worker_function: Callable,
kb_base_url: str = "https://ci.kbase.us/services/",
token_filepath: str | None = None,
workers: int = 5,
retrieve_sample: bool = False,
ignore_no_sample_error: bool = False,
self,
job_dir: str,
output_dir: str | None = None,
kb_base_url: str = "https://ci.kbase.us/services/",
token_filepath: str | None = None,
au_service_ver: str = "release",
workers: int = 5,
max_callback_server_tasks: int = 20,
worker_function: Callable | None = None,
retrieve_sample: bool = False,
ignore_no_sample_error: bool = False,
workspace_downloader: bool = False,
catalog_admin: bool = False,
) -> None:
"""
Initialize the configuration class.

Args:
job_dir (str): The directory for SDK jobs per user.
output_dir (str): The directory for a specific workspace id under sourcedata/ws.
worker_function (Callable): The function that will be called by the workers.
kb_base_url (str): The base url of the KBase services.
token_filepath (str): The file path that stores a KBase token appropriate for the KBase environment.
token_filepath (str): The file path that stores a KBase token appropriate for the KBase environment.
If not supplied, the token must be provided in the environment variable KB_AUTH_TOKEN.
The KB_ADMIN_AUTH_TOKEN environment variable will get set by this token if the user runs as catalog admin.
au_service_ver (str): The service version of AssemblyUtilClient
('dev', 'beta', 'release', or a git commit).
workers (int): The number of workers to use for multiprocessing.
max_callback_server_tasks (int): The maximum number of subtasks for the callback server.
worker_function (Callable): The function that will be called by the workers.
retrieve_sample (bool): Whether to retrieve sample for each genome object.
ignore_no_sample_error (bool): Whether to ignore the error when no sample data is found.
workspace_downloader (bool): Whether to be used for the workspace downloader script.
catalog_admin (bool): Whether to run the callback server as catalog admin.
"""
port = loader_helper.find_free_port()
token = loader_helper.get_token(token_filepath)
self.retrieve_sample = retrieve_sample
self.ignore_no_sample_error = ignore_no_sample_error
ipv4 = loader_helper.get_ip()
port = loader_helper.find_free_port()

Check warning on line 71 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L71

Added line #L71 was not covered by tests

# common instance variables

self.token = loader_helper.get_token(token_filepath)

Check warning on line 75 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L75

Added line #L75 was not covered by tests

# setup and run callback server container
self._start_callback_server(
docker.from_env(),
uuid.uuid4().hex,
job_dir,
kb_base_url,
token,
self.token,
port,
max_callback_server_tasks,
ipv4,
catalog_admin,
)

ws_url = os.path.join(kb_base_url, "ws")
sample_url = os.path.join(kb_base_url, "sampleservice")
callback_url = "http://" + ipv4 + ":" + str(port)
print("callback_url:", callback_url)

self.ws = Workspace(ws_url, token=token)
self.asu = AssemblyUtil(callback_url, token=token)
self.ss = SampleService(sample_url, token=token)
self.callback_url = "http://" + ipv4 + ":" + str(port)
print("callback_url:", self.callback_url)

Check warning on line 91 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L90-L91

Added lines #L90 - L91 were not covered by tests

self.workers = workers
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
self.output_dir = output_dir
self.input_queue = Queue()
self.output_queue = Queue()
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
self.job_data_dir = loader_helper.make_job_data_dir(job_dir)
self.pools = Pool(workers, worker_function, [self])

# unique to downloader
if workspace_downloader:
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
if worker_function is None:
raise ValueError(

Check warning on line 98 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L96-L98

Added lines #L96 - L98 were not covered by tests
"worker_function cannot be None for the workspace downloader script"
)

self.input_queue = Queue()
self.output_dir = output_dir

Check warning on line 103 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L102-L103

Added lines #L102 - L103 were not covered by tests

self.retrieve_sample = retrieve_sample
self.ignore_no_sample_error = ignore_no_sample_error

Check warning on line 106 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L105-L106

Added lines #L105 - L106 were not covered by tests

ws_url = os.path.join(kb_base_url, "ws")
self.ws = Workspace(ws_url, token=self.token)

Check warning on line 109 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L108-L109

Added lines #L108 - L109 were not covered by tests

self.asu = AssemblyUtil(

Check warning on line 111 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L111

Added line #L111 was not covered by tests
self.callback_url, service_ver=au_service_ver, token=self.token
)

sample_url = os.path.join(kb_base_url, "sampleservice")
self.ss = SampleService(sample_url, token=self.token)

Check warning on line 116 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L115-L116

Added lines #L115 - L116 were not covered by tests

self.pools = Pool(workers, worker_function, [self])

Check warning on line 118 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L118

Added line #L118 was not covered by tests

def _setup_callback_server_envs(
self,
job_dir: str,
kb_base_url: str,
token: str,
port: int,
ipv4: str,
self,
job_dir: str,
kb_base_url: str,
token: str,
port: int,
max_callback_server_tasks: int,
ipv4: str,
catalog_admin: bool,
) -> Tuple[dict[str, Union[int, str]], dict[str, dict[str, str]]]:
"""
Setup the environment variables and volumes for the callback server.
Expand All @@ -89,6 +135,9 @@
kb_base_url (str): The base url of the KBase services.
token (str): The KBase token.
port (int): The port number for the callback server.
max_callback_server_tasks (int): The maximum number of subtasks for the callback server.
ipv4 (str): The ipv4 address for the callback server.
catalog_admin (bool): Whether to run the callback server as catalog admin.

Returns:
tuple: A tuple of the environment variables and volumes for the callback server.
Expand All @@ -102,9 +151,14 @@
env["KB_BASE_URL"] = kb_base_url
env["JOB_DIR"] = job_dir
env["CALLBACK_PORT"] = port
env["JR_MAX_TASKS"] = max_callback_server_tasks

Check warning on line 154 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L154

Added line #L154 was not covered by tests
env["CALLBACK_IP"] = ipv4 # specify an ipv4 address for the callback server
# otherwise, the callback container will use the an ipv6 address

# set admin token to get catalog secure params
if catalog_admin:
env["KB_ADMIN_AUTH_TOKEN"] = token

Check warning on line 160 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L159-L160

Added lines #L159 - L160 were not covered by tests

# setup volumes required for docker container
docker_host = os.environ["DOCKER_HOST"]
if docker_host.startswith("unix:"):
Expand All @@ -116,14 +170,16 @@
return env, vol

def _start_callback_server(
self,
client: docker.client,
container_name: str,
job_dir: str,
kb_base_url: str,
token: str,
port: int,
ipv4: str,
self,
client: docker.client,
container_name: str,
job_dir: str,
kb_base_url: str,
token: str,
port: int,
max_callback_server_tasks: int,
ipv4: str,
catalog_admin: bool,
) -> None:
"""
Start the callback server.
Expand All @@ -134,9 +190,20 @@
job_dir (str): The directory for SDK jobs per user.
kb_base_url (str): The base url of the KBase services.
token (str): The KBase token.
max_callback_server_tasks (int): The maximum number of subtasks for the callback server.
port (int): The port number for the callback server.
ipv4 (str): The ipv4 address for the callback server.
catalog_admin (bool): Whether to run the callback server as catalog admin.
"""
env, vol = self._setup_callback_server_envs(job_dir, kb_base_url, token, port, ipv4)
env, vol = self._setup_callback_server_envs(

Check warning on line 198 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L198

Added line #L198 was not covered by tests
job_dir,
kb_base_url,
token,
port,
max_callback_server_tasks,
ipv4,
catalog_admin,
)
self.container = client.containers.run(
name=container_name,
image=CALLBACK_IMAGE_NAME,
Expand All @@ -147,9 +214,21 @@
)
time.sleep(2)

def _get_container_logs(self) -> None:
"""
Get logs from the callback server container.
"""
logs = self.container.logs()
if logs:
print("\n****** Logs from the Callback Server ******\n")
logs = logs.decode("utf-8")
for line in logs.split("\n"):
print(line)

Check warning on line 226 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L221-L226

Added lines #L221 - L226 were not covered by tests

def stop_callback_server(self) -> None:
"""
Stop the callback server.
"""
self._get_container_logs()

Check warning on line 232 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L232

Added line #L232 was not covered by tests
self.container.stop()
self.container.remove()
7 changes: 7 additions & 0 deletions src/loaders/common/loader_common_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@
'APPDEV': 'https://appdev.kbase.us/services/',
'PROD': 'https://kbase.us/services/'}

# containers.conf path
CONTAINERS_CONF_PATH = "~/.config/containers/containers.conf"
# params in containers.conf file
CONTAINERS_CONF_PARAMS = {
"seccomp_profile": "\"unconfined\"",
"log_driver": "\"k8s-file\""
}
# field name for Kbase object metadata
FLD_KB_OBJ_UPA = "upa"
FLD_KB_OBJ_NAME = "name"
Expand Down
60 changes: 57 additions & 3 deletions src/loaders/common/loader_helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import argparse
import configparser
import fcntl
import itertools
import json
import os
Expand Down Expand Up @@ -26,6 +28,8 @@
)
from src.loaders.common.loader_common_names import (
COLLECTION_SOURCE_DIR,
CONTAINERS_CONF_PARAMS,
CONTAINERS_CONF_PATH,
DOCKER_HOST,
FATAL_ERROR,
FATAL_STACKTRACE,
Expand Down Expand Up @@ -298,6 +302,45 @@
return proc


def _get_containers_config(conf_path: str):
"""Get containers.conf file at home directory."""
config = configparser.ConfigParser()
config.read(conf_path)
return config

Check warning on line 309 in src/loaders/common/loader_helper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/loader_helper.py#L307-L309

Added lines #L307 - L309 were not covered by tests


def is_config_modification_required():
"""check if the config requires modification."""
conf_path = os.path.expanduser(CONTAINERS_CONF_PATH)
config = _get_containers_config(conf_path)
if not config.has_section("containers"):
return True
for key, val in CONTAINERS_CONF_PARAMS.items():
if config.get("containers", key, fallback=None) != val:
return True
return False

Check warning on line 321 in src/loaders/common/loader_helper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/loader_helper.py#L314-L321

Added lines #L314 - L321 were not covered by tests


def setup_callback_server_logs():
"""Set up containers.conf file for the callback server logs."""
conf_path = os.path.expanduser(CONTAINERS_CONF_PATH)
with open(conf_path, "w") as writer:
try:
fcntl.flock(writer.fileno(), fcntl.LOCK_EX)
config = _get_containers_config(conf_path)

Check warning on line 330 in src/loaders/common/loader_helper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/loader_helper.py#L326-L330

Added lines #L326 - L330 were not covered by tests

if not config.has_section("containers"):
config.add_section("containers")

Check warning on line 333 in src/loaders/common/loader_helper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/loader_helper.py#L332-L333

Added lines #L332 - L333 were not covered by tests

for key, val in CONTAINERS_CONF_PARAMS.items():
config.set("containers", key, val)

Check warning on line 336 in src/loaders/common/loader_helper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/loader_helper.py#L335-L336

Added lines #L335 - L336 were not covered by tests

config.write(writer)
print(f"containers.conf is modified and saved to path: {conf_path}")

Check warning on line 339 in src/loaders/common/loader_helper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/loader_helper.py#L338-L339

Added lines #L338 - L339 were not covered by tests
finally:
fcntl.flock(writer.fileno(), fcntl.LOCK_UN)

Check warning on line 341 in src/loaders/common/loader_helper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/loader_helper.py#L341

Added line #L341 was not covered by tests


def is_upa_info_complete(upa_dir: str):
"""
Check whether an UPA needs to be downloaded or not by loading the metadata file.
Expand Down Expand Up @@ -430,19 +473,30 @@
os.link(target_file, new_file)


def list_objects(wsid, conf, object_type, include_metadata=False, batch_size=10000):
def list_objects(wsid, ws, object_type, include_metadata=False, batch_size=10000):
"""
List all objects information given a workspace ID.
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved

Args:
wsid (int): Target workspace addressed by the permanent ID
ws (Workspace): Workspace client
object_type (str): Type of the objects to be listed
include_metadata (boolean): Whether to include the user provided metadata in the returned object_info
batch_size (int): Number of objects to process in each batch

Returns:
list: a list of objects on the target workspace

"""
if batch_size > 10000:
raise ValueError("Maximum value for listing workspace objects is 10000")

maxObjectID = conf.ws.get_workspace_info({"id": wsid})[4]
maxObjectID = ws.get_workspace_info({"id": wsid})[4]

Check warning on line 494 in src/loaders/common/loader_helper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/loader_helper.py#L494

Added line #L494 was not covered by tests
batch_input = [
[idx + 1, idx + batch_size] for idx in range(0, maxObjectID, batch_size)
]
objs = [
conf.ws.list_objects(
ws.list_objects(
_list_objects_params(wsid, min_id, max_id, object_type, include_metadata)
)
for min_id, max_id in batch_input
Expand Down
Loading
Loading