Skip to content

Commit

Permalink
make threads deamons
Browse files Browse the repository at this point in the history
  • Loading branch information
JannisNe committed Jan 28, 2022
1 parent d4a8651 commit cec4e1b
Showing 1 changed file with 4 additions and 7 deletions.
11 changes: 4 additions & 7 deletions timewise/wise_bigdata_desy_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def get_sample_photometric_data(self, max_nTAPjobs=8, perc=1, tables=None, chunk
self.clear_cluster_log_dir()
self._save_cluster_info()

tap_threads = [threading.Thread(target=self._tap_thread) for _ in range(max_nTAPjobs)]
cluster_threads = [threading.Thread(target=self._cluster_thread) for _ in range(max_nTAPjobs)]
tap_threads = [threading.Thread(target=self._tap_thread, daemon=True) for _ in range(max_nTAPjobs)]
cluster_threads = [threading.Thread(target=self._cluster_thread, daemon=True) for _ in range(max_nTAPjobs)]

for t in tap_threads + cluster_threads:
logger.debug('starting thread')
Expand All @@ -103,16 +103,14 @@ def get_sample_photometric_data(self, max_nTAPjobs=8, perc=1, tables=None, chunk
self._cluster_queue.join()
logger.debug('cluster done')

del tap_threads
del cluster_threads

self._combine_lcs(service=service, overwrite=True, remove=remove_chunks)
self._combine_metadata(service=service, overwrite=True, remove=remove_chunks)

def _tap_thread(self):
logger.debug('started tap thread')
while True:
tables, chunk, wait, mag, flux, cluster_time = self._tap_queue.get()
tables, chunk, wait, mag, flux, cluster_time = self._tap_queue.get(block=True)
logger.debug(f'querying IRSA for chunk {chunk}')

for t in tables:
Expand All @@ -135,7 +133,7 @@ def _tap_thread(self):
def _cluster_thread(self):
logger.debug('started cluster thread')
while True:
job_id, chunk = self._cluster_queue.get()
job_id, chunk = self._cluster_queue.get(block=True)
logger.debug(f'waiting for chunk {chunk} (Cluster job {job_id})')
self.wait_for_job(job_id)
logger.debug(f'cluster done for chunk {chunk} (Cluster job {job_id}). Start combining')
Expand Down Expand Up @@ -313,7 +311,6 @@ def _make_cluster_script(self, cluster_h, cluster_ram, tables, service):
f'--base_name {self.base_name} ' \
f'--min_sep_arcsec {self.min_sep.to("arcsec").value} ' \
f'--n_chunks {self._n_chunks} ' \
f'--service {service} ' \
f'--job_id $SGE_TASK_ID ' \
f'{tables_str}' \
'cp $TMPDIR/${JOB_ID}_${SGE_TASK_ID}_stdout.txt ' + self.cluster_log_dir + '\n' \
Expand Down

0 comments on commit cec4e1b

Please sign in to comment.