Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RE2022-272: Update GenomeFileUtil to run in batch & parallelization 2 #207

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions GenomeFileUtil.spec
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,25 @@ module GenomeFileUtil {
funcdef save_one_genome(SaveOneGenomeParams params)
returns (SaveGenomeResult returnVal) authentication required;

typedef structure {
string name;
KBaseGenomes.Genome data;
boolean hidden;
boolean upgrade;
} GenomeInput;

typedef structure {
int workspace_id;
list<GenomeInput> inputs;
} SaveGenomesParams;

typedef structure {
list<SaveGenomeResult> results;
} SaveGenomesResults;

funcdef save_genomes(SaveGenomesParams params)
returns(SaveGenomesResults results) authentication required;

/*
gff_file - object containing path to gff_file
ws_ref - input Assembly or Genome reference
Expand Down
17 changes: 17 additions & 0 deletions lib/GenomeFileUtil/GenomeFileUtilImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,23 @@ def save_one_genome(self, ctx, params):
# return the results
return [returnVal]

def save_genomes(self, ctx, params):
"""This function is the mass function of save_one_genome"""
# ctx is the context object
# return variables are: results
#BEGIN save_genomes
results = {
"results": GenomeInterface(self.cfg).save_genome_mass(params)
}
#END save_genomes

# At some point might do deeper type checking...
if not isinstance(results, dict):
raise ValueError('Method save_genomes return value ' +
'results is not type dict as required.')
# return the results
return [results]

def ws_obj_gff_to_genome(self, ctx, params):
"""
This function takes in a workspace object of type KBaseGenomes.Genome or KBaseGenomeAnnotations.Assembly and a gff file and produces a KBaseGenomes.Genome reanotated according to the the input gff file.
Expand Down
4 changes: 4 additions & 0 deletions lib/GenomeFileUtil/GenomeFileUtilServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ def __init__(self):
name='GenomeFileUtil.save_one_genome',
types=[dict])
self.method_authentication['GenomeFileUtil.save_one_genome'] = 'required' # noqa
self.rpc_service.add(impl_GenomeFileUtil.save_genomes,
name='GenomeFileUtil.save_genomes',
types=[dict])
self.method_authentication['GenomeFileUtil.save_genomes'] = 'required' # noqa
self.rpc_service.add(impl_GenomeFileUtil.ws_obj_gff_to_genome,
name='GenomeFileUtil.ws_obj_gff_to_genome',
types=[dict])
Expand Down
204 changes: 149 additions & 55 deletions lib/GenomeFileUtil/core/GenomeInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
MAX_THREADS_DEFAULT = 10
THREADS_PER_CPU_DEFAULT = 1

_WSID = 'workspace_id'
_INPUTS = 'inputs'

class GenomeInterface:
def __init__(self, config):
Expand All @@ -36,18 +38,15 @@
self.scratch = config.raw['scratch']
self.ws_large_data = WsLargeDataIO(self.callback_url)

@staticmethod
def _validate_save_one_genome_params(params):
def _validate_genome_input_params(self, genome_input):
"""
_validate_save_one_genome_params:
validates params passed to save_one_genome method
Check required parameters are in genome_input
"""
logging.info('start validating save_one_genome params')
logging.info('start validating genome inputs params')
# check for required parameters
for p in ['workspace', 'name', 'data']:
if p not in params:
raise ValueError(
'"{}" parameter is required, but missing'.format(p))
for p in ['name', 'data']:
if p not in genome_input:
raise ValueError(f"{p} parameter is required, but missing")

Check warning on line 49 in lib/GenomeFileUtil/core/GenomeInterface.py

View check run for this annotation

Codecov / codecov/patch

lib/GenomeFileUtil/core/GenomeInterface.py#L49

Added line #L49 was not covered by tests

def _check_shock_response(self, response, errtxt):
"""
Expand Down Expand Up @@ -132,54 +131,141 @@
return data, res['info']
# return self.dfu.get_objects(params)['data'][0]

def save_one_genome(self, params):
def _save_genome_mass(self, params):
logging.info('start saving genome object')
self._validate_save_one_genome_params(params)
workspace = params['workspace']
name = params['name']
data = params['data']
# XXX there is no `workspace_datatype` param in the spec
ws_datatype = params.get('workspace_datatype', "KBaseGenomes.Genome")
# XXX there is no `meta` param in the spec
meta = params.get('meta', {})
if "AnnotatedMetagenomeAssembly" in ws_datatype:
if params.get('upgrade') or 'feature_counts' not in data:
data = self._update_metagenome(data)
else:
if params.get('upgrade') or 'feature_counts' not in data:
data = self._update_genome(data)

# check all handles point to shock nodes owned by calling user
self._own_handle(data, 'genbank_handle_ref')
self._own_handle(data, 'gff_handle_ref')
if "AnnotatedMetagenomeAssembly" not in ws_datatype:
self._check_dna_sequence_in_features(data)
data['warnings'] = self.validate_genome(data)

# sort data
data = GenomeUtils.sort_dict(data)
# dump genome to scratch for upload
data_path = os.path.join(self.scratch, name + ".json")
json.dump(data, open(data_path, 'w'))
if 'hidden' in params and str(params['hidden']).lower() in ('yes', 'true', 't', '1'):
hidden = 1
else:
hidden = 0
workspace_id = params[_WSID]
input_files = params[_INPUTS]

ws_datatypes = []
data_paths = []
names = []
meta_data = []
hidden_data = []
warnings = []

for input_file in input_files:

# retrive required params
name = input_file['name']
data = input_file['data']

# XXX there is no `workspace_datatype` param in the spec
ws_datatype = input_file.get('workspace_datatype', "KBaseGenomes.Genome")
# XXX there is no `meta` param in the spec
meta = input_file.get('meta', {})

ws_datatypes.append(ws_datatype)
names.append(name)
meta_data.append(meta)

if "AnnotatedMetagenomeAssembly" in ws_datatype:
if input_file.get('upgrade') or 'feature_counts' not in data:
data = self._update_metagenome(data)

Check warning on line 163 in lib/GenomeFileUtil/core/GenomeInterface.py

View check run for this annotation

Codecov / codecov/patch

lib/GenomeFileUtil/core/GenomeInterface.py#L162-L163

Added lines #L162 - L163 were not covered by tests
else:
if input_file.get('upgrade') or 'feature_counts' not in data:
data = self._update_genome(data)

# check all handles point to shock nodes owned by calling user
self._own_handle(data, 'genbank_handle_ref')
self._own_handle(data, 'gff_handle_ref')
if "AnnotatedMetagenomeAssembly" not in ws_datatype:
self._check_dna_sequence_in_features(data)
data['warnings'] = self.validate_genome(data)

# sort data
data = GenomeUtils.sort_dict(data)
# dump genome to scratch for upload
data_path = os.path.join(self.scratch, name + ".json")
json.dump(data, open(data_path, 'w'))
if 'hidden' in params and str(params['hidden']).lower() in ('yes', 'true', 't', '1'):
hidden = 1

Check warning on line 181 in lib/GenomeFileUtil/core/GenomeInterface.py

View check run for this annotation

Codecov / codecov/patch

lib/GenomeFileUtil/core/GenomeInterface.py#L181

Added line #L181 was not covered by tests
else:
hidden = 0

data_paths.append(data_path)
hidden_data.append(hidden)
warnings.append(data['warnings'])

dfu_infos = self._save_genome_objects(
workspace_id,
ws_datatypes,
data_paths,
names,
meta_data,
hidden_data,
)

output = [
{'info': dfu_oi, 'warnings': warning}
for dfu_oi, warning in zip(dfu_infos, warnings)
]
return output

if isinstance(workspace, int) or workspace.isdigit():
workspace_id = workspace
else:
workspace_id = self.dfu.ws_name_to_id(workspace)

save_params = {'id': workspace_id,
'objects': [{'type': ws_datatype,
'data_json_file': data_path,
'name': name,
'meta': meta,
'hidden': hidden}]}
dfu_oi = self.ws_large_data.save_objects(save_params)[0]
returnVal = {'info': dfu_oi, 'warnings': data.get('warnings', [])}
return returnVal
def save_one_genome(self, params):
print('validating parameters')
mass_params = self._set_up_single_params(params)
return self._save_genome_mass(mass_params)[0]

def save_genome_mass(self, params):
print('validating parameters')
self._validate_mass_params(params)
return self._save_genome_mass(params)

Check warning on line 212 in lib/GenomeFileUtil/core/GenomeInterface.py

View check run for this annotation

Codecov / codecov/patch

lib/GenomeFileUtil/core/GenomeInterface.py#L210-L212

Added lines #L210 - L212 were not covered by tests

def _set_up_single_params(self, params):
inputs = dict(params)
self._validate_genome_input_params(inputs)
ws_id = self._get_int(inputs.pop(_WSID, None), _WSID)
ws_name = inputs.pop('workspace', None)
if (bool(ws_id) == bool(ws_name)): # xnor
raise ValueError(f"Exactly one of a '{_WSID}' or a 'workspace' parameter must be provided")
if not ws_id:
print(f"Translating workspace name {ws_name} to a workspace ID. Prefer submitting "
+ "a workspace ID over a mutable workspace name that may cause race conditions")
ws_id = self.dfu.ws_name_to_id(ws_name)
mass_params = {_WSID: ws_id, _INPUTS: [inputs]}
return mass_params

def _validate_mass_params(self, params):
ws_id = self._get_int(params.get(_WSID), _WSID)
if not ws_id:
raise ValueError(f"{_WSID} is required")
inputs = params.get(_INPUTS)
if not inputs or type(inputs) != list:
raise ValueError(f"{_INPUTS} field is required and must be a non-empty list")
for i, inp in enumerate(inputs, start=1):
if type(inp) != dict:
raise ValueError(f"Entry #{i} in {_INPUTS} field is not a mapping as required")

Check warning on line 237 in lib/GenomeFileUtil/core/GenomeInterface.py

View check run for this annotation

Codecov / codecov/patch

lib/GenomeFileUtil/core/GenomeInterface.py#L229-L237

Added lines #L229 - L237 were not covered by tests
# check required params in genome input
self._validate_genome_input_params(inp)

Check warning on line 239 in lib/GenomeFileUtil/core/GenomeInterface.py

View check run for this annotation

Codecov / codecov/patch

lib/GenomeFileUtil/core/GenomeInterface.py#L239

Added line #L239 was not covered by tests

def _save_genome_objects(
self,
workspace_id,
ws_datatypes,
data_paths,
names,
meta_data,
hidden_data,
):
print('Saving Genomes to Workspace')
sys.stdout.flush()
ws_inputs = []

for ws_datatype, data_path, name, meta, hidden in zip(
ws_datatypes, data_paths, names, meta_data, hidden_data
):
ws_inputs.append(
{
'type': ws_datatype,
'data_json_file': data_path,
'name': name,
'meta': meta,
'hidden': hidden,
}
)
return self.ws_large_data.save_objects(
{'id': workspace_id, 'objects': ws_inputs}
)

@staticmethod
def determine_tier(source):
Expand Down Expand Up @@ -436,3 +522,11 @@
f"permitted size of {sizeof_fmt(MAX_GENOME_SIZE)}.\n"
f"Here is the breakdown for feature lists and their respective "
f"sizes:\n{master_key_sizes}")

def _get_int(self, putative_int, name, minimum=1):
if putative_int is not None:
if type(putative_int) != int:
raise ValueError(f"{name} must be an integer, got: {putative_int}")
if putative_int < minimum:
raise ValueError(f"{name} must be an integer >= {minimum}")

Check warning on line 531 in lib/GenomeFileUtil/core/GenomeInterface.py

View check run for this annotation

Codecov / codecov/patch

lib/GenomeFileUtil/core/GenomeInterface.py#L528-L531

Added lines #L528 - L531 were not covered by tests
return putative_int
2 changes: 1 addition & 1 deletion scripts/run_tests_within_container.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export PYTHONPATH=$script_dir/../lib:$PYTHONPATH
export PYTHONPATH=$script_dir/../test:$PYTHONPATH

# Set TEST_PATH to run a specific test. Eg: TEST_PATH=test.core.update_taxon_assignments_test
export TEST_PATH=.
export TEST_PATH=test.problematic_tests.save_genome_test

cd $script_dir/../test
python -m nose --with-coverage --cover-package=GenomeFileUtil --cover-html --cover-html-dir=/kb/module/work/test_coverage --cover-xml --cover-xml-file=/kb/module/work/test_coverage/coverage.xml --nocapture --nologcapture $TEST_PATH
Expand Down
8 changes: 5 additions & 3 deletions test/problematic_tests/save_genome_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ def setUpClass(cls):

suffix = int(time.time() * 1000)
cls.wsName = "test_SaveGenomeTest_" + str(suffix)
cls.wsClient.create_workspace({'workspace': cls.wsName})

ws_info = cls.wsClient.create_workspace({'workspace': cls.wsName})
print("ws_info is: ")
print(ws_info)
cls.nodes_to_delete = []
cls.prepare_data()

Expand Down Expand Up @@ -163,7 +164,8 @@ def test_bad_one_genome_params(self):
invalidate_params = {'missing_workspace': 'workspace',
'name': 'name',
'data': 'data'}
error_msg = '"workspace" parameter is required, but missing'
error_msg = "Exactly one of a 'workspace_id' or " \
"a 'workspace' parameter must be provided"
self.fail_save_one_genome(invalidate_params, error_msg)

def test_one_genome(self):
Expand Down
Loading