diff --git a/timewise/wise_bigdata_desy_cluster.py b/timewise/wise_bigdata_desy_cluster.py index 4338ed0..82113f9 100644 --- a/timewise/wise_bigdata_desy_cluster.py +++ b/timewise/wise_bigdata_desy_cluster.py @@ -85,8 +85,8 @@ def get_sample_photometric_data(self, max_nTAPjobs=8, perc=1, tables=None, chunk self.clear_cluster_log_dir() self._save_cluster_info() - tap_threads = [threading.Thread(target=self._tap_thread) for _ in range(max_nTAPjobs)] - cluster_threads = [threading.Thread(target=self._cluster_thread) for _ in range(max_nTAPjobs)] + tap_threads = [threading.Thread(target=self._tap_thread, daemon=True) for _ in range(max_nTAPjobs)] + cluster_threads = [threading.Thread(target=self._cluster_thread, daemon=True) for _ in range(max_nTAPjobs)] for t in tap_threads + cluster_threads: logger.debug('starting thread') @@ -103,8 +103,6 @@ def get_sample_photometric_data(self, max_nTAPjobs=8, perc=1, tables=None, chunk self._cluster_queue.join() logger.debug('cluster done') - del tap_threads - del cluster_threads self._combine_lcs(service=service, overwrite=True, remove=remove_chunks) self._combine_metadata(service=service, overwrite=True, remove=remove_chunks) @@ -112,7 +110,7 @@ def get_sample_photometric_data(self, max_nTAPjobs=8, perc=1, tables=None, chunk def _tap_thread(self): logger.debug('started tap thread') while True: - tables, chunk, wait, mag, flux, cluster_time = self._tap_queue.get() + tables, chunk, wait, mag, flux, cluster_time = self._tap_queue.get(block=True) logger.debug(f'querying IRSA for chunk {chunk}') for t in tables: @@ -135,7 +133,7 @@ def _tap_thread(self): def _cluster_thread(self): logger.debug('started cluster thread') while True: - job_id, chunk = self._cluster_queue.get() + job_id, chunk = self._cluster_queue.get(block=True) logger.debug(f'waiting for chunk {chunk} (Cluster job {job_id})') self.wait_for_job(job_id) logger.debug(f'cluster done for chunk {chunk} (Cluster job {job_id}). Start combining') @@ -313,7 +311,6 @@ def _make_cluster_script(self, cluster_h, cluster_ram, tables, service): f'--base_name {self.base_name} ' \ f'--min_sep_arcsec {self.min_sep.to("arcsec").value} ' \ f'--n_chunks {self._n_chunks} ' \ - f'--service {service} ' \ f'--job_id $SGE_TASK_ID ' \ f'{tables_str}' \ 'cp $TMPDIR/${JOB_ID}_${SGE_TASK_ID}_stdout.txt ' + self.cluster_log_dir + '\n' \