diff --git a/GenomeFileUtil.spec b/GenomeFileUtil.spec index d89e9a53..f83544ee 100644 --- a/GenomeFileUtil.spec +++ b/GenomeFileUtil.spec @@ -292,6 +292,25 @@ module GenomeFileUtil { funcdef save_one_genome(SaveOneGenomeParams params) returns (SaveGenomeResult returnVal) authentication required; + typedef structure { + string name; + KBaseGenomes.Genome data; + boolean hidden; + boolean upgrade; + } GenomeInput; + + typedef structure { + int workspace_id; + list inputs; + } SaveGenomesParams; + + typedef structure { + list results; + } SaveGenomesResults; + + funcdef save_genomes(SaveGenomesParams params) + returns(SaveGenomesResults results) authentication required; + /* gff_file - object containing path to gff_file ws_ref - input Assembly or Genome reference diff --git a/lib/GenomeFileUtil/GenomeFileUtilImpl.py b/lib/GenomeFileUtil/GenomeFileUtilImpl.py index c4b84c11..30d4bd81 100644 --- a/lib/GenomeFileUtil/GenomeFileUtilImpl.py +++ b/lib/GenomeFileUtil/GenomeFileUtilImpl.py @@ -1166,6 +1166,23 @@ def save_one_genome(self, ctx, params): # return the results return [returnVal] + def save_genomes(self, ctx, params): + """This function is the mass function of save_one_genome""" + # ctx is the context object + # return variables are: results + #BEGIN save_genomes + results = { + "results": GenomeInterface(self.cfg).save_genome_mass(params) + } + #END save_genomes + + # At some point might do deeper type checking... + if not isinstance(results, dict): + raise ValueError('Method save_genomes return value ' + + 'results is not type dict as required.') + # return the results + return [results] + def ws_obj_gff_to_genome(self, ctx, params): """ This function takes in a workspace object of type KBaseGenomes.Genome or KBaseGenomeAnnotations.Assembly and a gff file and produces a KBaseGenomes.Genome reanotated according to the the input gff file. diff --git a/lib/GenomeFileUtil/core/GenomeInterface.py b/lib/GenomeFileUtil/core/GenomeInterface.py index b1aaa3ce..591185d4 100644 --- a/lib/GenomeFileUtil/core/GenomeInterface.py +++ b/lib/GenomeFileUtil/core/GenomeInterface.py @@ -20,6 +20,8 @@ MAX_THREADS_DEFAULT = 10 THREADS_PER_CPU_DEFAULT = 1 +_WSID = 'workspace_id' +_INPUTS = 'inputs' class GenomeInterface: def __init__(self, config): @@ -36,18 +38,15 @@ def __init__(self, config): self.scratch = config.raw['scratch'] self.ws_large_data = WsLargeDataIO(self.callback_url) - @staticmethod - def _validate_save_one_genome_params(params): + def _validate_genome_input_params(self, genome_input): """ - _validate_save_one_genome_params: - validates params passed to save_one_genome method + Check required parameters are in genome_input """ - logging.info('start validating save_one_genome params') + logging.info('start validating genome inputs params') # check for required parameters - for p in ['workspace', 'name', 'data']: - if p not in params: - raise ValueError( - '"{}" parameter is required, but missing'.format(p)) + for p in ['name', 'data']: + if p not in genome_input: + raise ValueError(f"{p} parameter is required, but missing") def _check_shock_response(self, response, errtxt): """ @@ -132,54 +131,141 @@ def get_one_genome(self, params): return data, res['info'] # return self.dfu.get_objects(params)['data'][0] - def save_one_genome(self, params): + def _save_genome_mass(self, params): logging.info('start saving genome object') - self._validate_save_one_genome_params(params) - workspace = params['workspace'] - name = params['name'] - data = params['data'] - # XXX there is no `workspace_datatype` param in the spec - ws_datatype = params.get('workspace_datatype', "KBaseGenomes.Genome") - # XXX there is no `meta` param in the spec - meta = params.get('meta', {}) - if "AnnotatedMetagenomeAssembly" in ws_datatype: - if params.get('upgrade') or 'feature_counts' not in data: - data = self._update_metagenome(data) - else: - if params.get('upgrade') or 'feature_counts' not in data: - data = self._update_genome(data) - - # check all handles point to shock nodes owned by calling user - self._own_handle(data, 'genbank_handle_ref') - self._own_handle(data, 'gff_handle_ref') - if "AnnotatedMetagenomeAssembly" not in ws_datatype: - self._check_dna_sequence_in_features(data) - data['warnings'] = self.validate_genome(data) - - # sort data - data = GenomeUtils.sort_dict(data) - # dump genome to scratch for upload - data_path = os.path.join(self.scratch, name + ".json") - json.dump(data, open(data_path, 'w')) - if 'hidden' in params and str(params['hidden']).lower() in ('yes', 'true', 't', '1'): - hidden = 1 - else: - hidden = 0 + workspace_id = params[_WSID] + input_files = params[_INPUTS] + + ws_datatypes = [] + data_paths = [] + names = [] + meta_data = [] + hidden_data = [] + warnings = [] + + for input_file in input_files: + + # retrive required params + name = input_file['name'] + data = input_file['data'] + + # XXX there is no `workspace_datatype` param in the spec + ws_datatype = input_file.get('workspace_datatype', "KBaseGenomes.Genome") + # XXX there is no `meta` param in the spec + meta = input_file.get('meta', {}) + + ws_datatypes.append(ws_datatype) + names.append(name) + meta_data.append(meta) + + if "AnnotatedMetagenomeAssembly" in ws_datatype: + if input_file.get('upgrade') or 'feature_counts' not in data: + data = self._update_metagenome(data) + else: + if input_file.get('upgrade') or 'feature_counts' not in data: + data = self._update_genome(data) + + # check all handles point to shock nodes owned by calling user + self._own_handle(data, 'genbank_handle_ref') + self._own_handle(data, 'gff_handle_ref') + if "AnnotatedMetagenomeAssembly" not in ws_datatype: + self._check_dna_sequence_in_features(data) + data['warnings'] = self.validate_genome(data) + + # sort data + data = GenomeUtils.sort_dict(data) + # dump genome to scratch for upload + data_path = os.path.join(self.scratch, name + ".json") + json.dump(data, open(data_path, 'w')) + if 'hidden' in params and str(params['hidden']).lower() in ('yes', 'true', 't', '1'): + hidden = 1 + else: + hidden = 0 + + data_paths.append(data_path) + hidden_data.append(hidden) + warnings.append(data['warnings']) + + dfu_infos = self._save_genome_objects( + workspace_id, + ws_datatypes, + data_paths, + names, + meta_data, + hidden_data, + ) + + output = [ + {'info': dfu_oi, 'warnings': warning} + for dfu_oi, warning in zip(dfu_infos, warnings) + ] + return output - if isinstance(workspace, int) or workspace.isdigit(): - workspace_id = workspace - else: - workspace_id = self.dfu.ws_name_to_id(workspace) - - save_params = {'id': workspace_id, - 'objects': [{'type': ws_datatype, - 'data_json_file': data_path, - 'name': name, - 'meta': meta, - 'hidden': hidden}]} - dfu_oi = self.ws_large_data.save_objects(save_params)[0] - returnVal = {'info': dfu_oi, 'warnings': data.get('warnings', [])} - return returnVal + def save_one_genome(self, params): + print('validating parameters') + mass_params = self._set_up_single_params(params) + return self._save_genome_mass(mass_params)[0] + + def save_genome_mass(self, params): + print('validating parameters') + self._validate_mass_params(params) + return self._save_genome_mass(params) + + def _set_up_single_params(self, params): + inputs = dict(params) + self._validate_genome_input_params(inputs) + ws_id = self._get_int(inputs.pop(_WSID, None), _WSID) + ws_name = inputs.pop('workspace', None) + if (bool(ws_id) == bool(ws_name)): # xnor + raise ValueError(f"Exactly one of a {_WSID} or a workspace_name must be provided") + if not ws_id: + print(f"Translating workspace name {ws_name} to a workspace ID. Prefer submitting " + + "a workspace ID over a mutable workspace name that may cause race conditions") + ws_id = self._dfu.ws_name_to_id(ws_name) + mass_params = {_WSID: ws_id, _INPUTS: [inputs]} + return mass_params + + def _validate_mass_params(self, params): + ws_id = self._get_int(params.get(_WSID), _WSID) + if not ws_id: + raise ValueError(f"{_WSID} is required") + inputs = params.get(_INPUTS) + if not inputs or type(inputs) != list: + raise ValueError(f"{_INPUTS} field is required and must be a non-empty list") + for i, inp in enumerate(inputs, start=1): + if type(inp) != dict: + raise ValueError(f"Entry #{i} in {_INPUTS} field is not a mapping as required") + # check required params in genome input + self._validate_genome_input_params(inp) + + def _save_genome_objects( + self, + workspace_id, + ws_datatypes, + data_paths, + names, + meta_data, + hidden_data, + ): + print('Saving Genomes to Workspace') + sys.stdout.flush() + ws_inputs = [] + + for ws_datatype, data_path, name, meta, hidden in zip( + ws_datatypes, data_paths, names, meta_data, hidden_data + ): + ws_inputs.append( + { + 'type': ws_datatype, + 'data_json_file': data_path, + 'name': name, + 'meta': meta, + 'hidden': hidden, + } + ) + return self.ws_large_data.save_objects( + {'id': workspace_id, 'objects': ws_inputs} + ) @staticmethod def determine_tier(source): @@ -436,3 +522,11 @@ def sizeof_fmt(num): f"permitted size of {sizeof_fmt(MAX_GENOME_SIZE)}.\n" f"Here is the breakdown for feature lists and their respective " f"sizes:\n{master_key_sizes}") + + def _get_int(self, putative_int, name, minimum=1): + if putative_int is not None: + if type(putative_int) != int: + raise ValueError(f"{name} must be an integer, got: {putative_int}") + if putative_int < minimum: + raise ValueError(f"{name} must be an integer >= {minimum}") + return putative_int