Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Callback server wrapper #554

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,89 @@
"""
Configuration class for the workspace downloader and workspace uploader scripts.
"""

def __init__(
self,
job_dir: str,
output_dir: str,
worker_function: Callable,
kb_base_url: str = "https://ci.kbase.us/services/",
token_filepath: str | None = None,
workers: int = 5,
retrieve_sample: bool = False,
ignore_no_sample_error: bool = False,
self,
job_dir: str,
output_dir: str,
kb_base_url: str = "https://ci.kbase.us/services/",
token_filepath: str | None = None,
au_service_ver: str = "release",
workers: int = 5,
max_task: int = 20,
worker_function: Callable | None = None,
retrieve_sample: bool = False,
ignore_no_sample_error: bool = False,
workspace_downloader: bool = True,
) -> None:
"""
Initialize the configuration class.

Args:
job_dir (str): The directory for SDK jobs per user.
output_dir (str): The directory for a specific workspace id under sourcedata/ws.
worker_function (Callable): The function that will be called by the workers.
kb_base_url (str): The base url of the KBase services.
token_filepath (str): The file path that stores a KBase token appropriate for the KBase environment.
token_filepath (str): The file path that stores a KBase token appropriate for the KBase environment.
If not supplied, the token must be provided in the environment variable KB_AUTH_TOKEN.
au_service_ver (str): The service verison of AssemblyUtilClient.
workers (int): The number of workers to use for multiprocessing.
max_task (int): The maxmium subtasks for the callback server.
worker_function (Callable): The function that will be called by the workers.
retrieve_sample (bool): Whether to retrieve sample for each genome object.
ignore_no_sample_error (bool): Whether to ignore the error when no sample data is found.
workspace_downloader (bool): Whether to be used for the workspace downloader script.
"""
# common instance variables
ipv4 = loader_helper.get_ip()

Check warning on line 53 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L53

Added line #L53 was not covered by tests
port = loader_helper.find_free_port()
token = loader_helper.get_token(token_filepath)
self.retrieve_sample = retrieve_sample
self.ignore_no_sample_error = ignore_no_sample_error
ipv4 = loader_helper.get_ip()

ws_url = os.path.join(kb_base_url, "ws")
callback_url = "http://" + ipv4 + ":" + str(port)
print("callback_url:", callback_url)

Check warning on line 59 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L57-L59

Added lines #L57 - L59 were not covered by tests

self._start_callback_server(
docker.from_env(),
uuid.uuid4().hex,
job_dir,
kb_base_url,
token,
port,
max_task,
ipv4,
)

ws_url = os.path.join(kb_base_url, "ws")
sample_url = os.path.join(kb_base_url, "sampleservice")
callback_url = "http://" + ipv4 + ":" + str(port)
print("callback_url:", callback_url)

self.ws = Workspace(ws_url, token=token)
self.asu = AssemblyUtil(callback_url, token=token)
self.ss = SampleService(sample_url, token=token)
self.asu = AssemblyUtil(callback_url, service_ver=au_service_ver, token=token)

Check warning on line 73 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L73

Added line #L73 was not covered by tests

self.workers = workers
self.output_dir = output_dir
self.input_queue = Queue()
self.output_queue = Queue()
self.job_data_dir = loader_helper.make_job_data_dir(job_dir)
self.pools = Pool(workers, worker_function, [self])

self.logging = None

Check warning on line 78 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L78

Added line #L78 was not covered by tests

# unique to downloader
if workspace_downloader:
if worker_function is None:
raise ValueError(

Check warning on line 83 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L81-L83

Added lines #L81 - L83 were not covered by tests
"worker_function cannot be None for the workspace downloader script"
)
self.input_queue = Queue()
self.retrieve_sample = retrieve_sample
self.ignore_no_sample_error = ignore_no_sample_error

Check warning on line 88 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L86-L88

Added lines #L86 - L88 were not covered by tests

sample_url = os.path.join(kb_base_url, "sampleservice")
self.ss = SampleService(sample_url, token=token)

Check warning on line 91 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L90-L91

Added lines #L90 - L91 were not covered by tests

self.pools = Pool(workers, worker_function, [self])

Check warning on line 93 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L93

Added line #L93 was not covered by tests

def _setup_callback_server_envs(
self,
job_dir: str,
kb_base_url: str,
token: str,
port: int,
ipv4: str,
self,
job_dir: str,
kb_base_url: str,
token: str,
port: int,
max_task: int,
ipv4: str,
) -> Tuple[dict[str, Union[int, str]], dict[str, dict[str, str]]]:
"""
Setup the environment variables and volumes for the callback server.
Expand All @@ -89,6 +109,8 @@
kb_base_url (str): The base url of the KBase services.
token (str): The KBase token.
port (int): The port number for the callback server.
max_task (int): The maxmium subtasks for the callback server.
ipv4: (str): The ipv4 address for the callback server.

Returns:
tuple: A tuple of the environment variables and volumes for the callback server.
Expand All @@ -99,9 +121,11 @@

# used by the callback server
env["KB_AUTH_TOKEN"] = token
env["KB_ADMIN_AUTH_TOKEN"] = token # pass in admin_token to get catalog params

Check warning on line 124 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L124

Added line #L124 was not covered by tests
env["KB_BASE_URL"] = kb_base_url
env["JOB_DIR"] = job_dir
env["CALLBACK_PORT"] = port
env["JR_MAX_TASKS"] = max_task

Check warning on line 128 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L128

Added line #L128 was not covered by tests
env["CALLBACK_IP"] = ipv4 # specify an ipv4 address for the callback server
# otherwise, the callback container will use the an ipv6 address

Expand All @@ -116,14 +140,15 @@
return env, vol

def _start_callback_server(
self,
client: docker.client,
container_name: str,
job_dir: str,
kb_base_url: str,
token: str,
port: int,
ipv4: str,
self,
client: docker.client,
container_name: str,
job_dir: str,
kb_base_url: str,
token: str,
port: int,
max_task: int,
ipv4: str,
) -> None:
"""
Start the callback server.
Expand All @@ -134,9 +159,13 @@
job_dir (str): The directory for SDK jobs per user.
kb_base_url (str): The base url of the KBase services.
token (str): The KBase token.
max_task (int): The maxmium subtasks for the callback server.
port (int): The port number for the callback server.
ipv4: (str): The ipv4 address for the callback server.
"""
env, vol = self._setup_callback_server_envs(job_dir, kb_base_url, token, port, ipv4)
env, vol = self._setup_callback_server_envs(

Check warning on line 166 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L166

Added line #L166 was not covered by tests
job_dir, kb_base_url, token, port, max_task, ipv4
)
self.container = client.containers.run(
name=container_name,
image=CALLBACK_IMAGE_NAME,
Expand All @@ -151,5 +180,6 @@
"""
Stop the callback server.
"""
self.logging = self.container.logs()

Check warning on line 183 in src/loaders/common/callback_server_wrapper.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/common/callback_server_wrapper.py#L183

Added line #L183 was not covered by tests
self.container.stop()
self.container.remove()
3 changes: 3 additions & 0 deletions src/loaders/common/loader_common_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,6 @@
'NEXT': 'https://next.kbase.us/services/',
'APPDEV': 'https://appdev.kbase.us/services/',
'PROD': 'https://kbase.us/services/'}

# service_vers
SERVICE_VERSIONS = ["dev", "beta", "release"]
33 changes: 29 additions & 4 deletions src/loaders/workspace_downloader/workspace_downloader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
usage: workspace_downloader.py [-h] --workspace_id WORKSPACE_ID [--kbase_collection KBASE_COLLECTION] [--source_version SOURCE_VERSION] [--root_dir ROOT_DIR]
[--env {CI,NEXT,APPDEV,PROD}] [--workers WORKERS] [--token_filepath TOKEN_FILEPATH] [--keep_job_dir] [--retrieve_sample] [--ignore_no_sample_error]
usage: workspace_downloader.py [-h] --workspace_id WORKSPACE_ID [--kbase_collection KBASE_COLLECTION] [--source_version SOURCE_VERSION]
[--root_dir ROOT_DIR] [--env {CI,NEXT,APPDEV,PROD}] [--workers WORKERS] [--token_filepath TOKEN_FILEPATH]
[--jr_max_tasks JR_MAX_TASKS] [--au_service_ver {dev,beta,release}] [--keep_job_dir] [--retrieve_sample]
[--ignore_no_sample_error]

PROTOTYPE - Download genome files from the workspace service (WSS).

Expand All @@ -22,6 +24,10 @@
--workers WORKERS Number of workers for multiprocessing (default: 5)
--token_filepath TOKEN_FILEPATH
A file path that stores KBase token
--jr_max_tasks JR_MAX_TASKS
The maxmium subtasks for the callback server (default: 20)
--au_service_ver {dev,beta,release}
The service version of AssemblyUtil client (default: release)
--keep_job_dir Keep SDK job directory after download task is completed
--retrieve_sample Retrieve sample for each genome object
--ignore_no_sample_error
Expand Down Expand Up @@ -55,7 +61,7 @@

import src.common.storage.collection_and_field_names as names
from src.loaders.common import loader_common_names, loader_helper
from src.loaders.workspace_downloader.workspace_downloader_helper import Conf
from src.loaders.common.callback_server_wrapper import Conf

# setup KB_AUTH_TOKEN as env or provide a token_filepath in --token_filepath
# export KB_AUTH_TOKEN="your-kb-auth-token"
Expand Down Expand Up @@ -373,6 +379,19 @@
type=str,
help="A file path that stores KBase token",
)
optional.add_argument(

Check warning on line 382 in src/loaders/workspace_downloader/workspace_downloader.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/workspace_downloader/workspace_downloader.py#L382

Added line #L382 was not covered by tests
"--jr_max_tasks",
type=int,
default=20,
help="The maxmium subtasks for the callback server",
)
optional.add_argument(

Check warning on line 388 in src/loaders/workspace_downloader/workspace_downloader.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/workspace_downloader/workspace_downloader.py#L388

Added line #L388 was not covered by tests
"--au_service_ver",
type=str,
choices=loader_common_names.SERVICE_VERSIONS,
default="release",
help="The service version of AssemblyUtil client",
)
optional.add_argument(
"--keep_job_dir",
action="store_true",
Expand All @@ -398,6 +417,8 @@
env = args.env
workers = args.workers
token_filepath = args.token_filepath
max_task = args.jr_max_tasks
au_service_ver = args.au_service_ver

Check warning on line 421 in src/loaders/workspace_downloader/workspace_downloader.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/workspace_downloader/workspace_downloader.py#L420-L421

Added lines #L420 - L421 were not covered by tests
keep_job_dir = args.keep_job_dir
retrieve_sample = args.retrieve_sample
ignore_no_sample_error = args.ignore_no_sample_error
Expand All @@ -410,6 +431,8 @@
)
if workspace_id <= 0:
parser.error(f"workspace_id needs to be > 0")
if max_task <= 0:
parser.error(f"jr_max_tasks needs to be > 0")

Check warning on line 435 in src/loaders/workspace_downloader/workspace_downloader.py

View check run for this annotation

Codecov / codecov/patch

src/loaders/workspace_downloader/workspace_downloader.py#L434-L435

Added lines #L434 - L435 were not covered by tests
if workers < 1 or workers > cpu_count():
parser.error(f"minimum worker is 1 and maximum worker is {cpu_count()}")

Expand Down Expand Up @@ -438,10 +461,12 @@
conf = Conf(
job_dir,
output_dir,
_process_input,
kb_base_url,
token_filepath,
au_service_ver,
workers,
max_task,
_process_input,
retrieve_sample,
ignore_no_sample_error,
)
Expand Down
2 changes: 1 addition & 1 deletion src/loaders/workspace_uploader/workspace_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import yaml

from src.loaders.common import loader_common_names, loader_helper
from src.loaders.workspace_downloader.workspace_downloader_helper import Conf
from src.loaders.common.callback_server_wrapper import Conf

# setup KB_AUTH_TOKEN as env or provide a token_filepath in --token_filepath
# export KB_AUTH_TOKEN="your-kb-auth-token"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from src.clients.AssemblyUtilClient import AssemblyUtil
from src.loaders.common import loader_helper
from src.loaders.workspace_downloader.workspace_downloader_helper import Conf
from src.loaders.common.callback_server_wrapper import Conf
from src.loaders.workspace_uploader import workspace_uploader

ASSEMBLY_DIR_NAMES = ["GCF_000979855.1", "GCF_000979175.1"]
Expand Down
Loading