diff --git a/data_files/surveys/des/bbc/bbc_3yr.input b/data_files/surveys/des/bbc/bbc_3yr.input index ee1aa8cf..b3b052f2 100644 --- a/data_files/surveys/des/bbc/bbc_3yr.input +++ b/data_files/surveys/des/bbc/bbc_3yr.input @@ -6,9 +6,6 @@ CONFIG: #END_YAML -varname_pIa=NN_PROB_IA -simfile_biascor=something.FITRES -simfile_ccprior=something.FITRES surveygroup_biascor='CFA3K+CFA4p1+CFA4p2+CSP(zbin=0.02),DES(zbin=0.05)' idsurvey_list_probcc0=CFA3K,CFA4p1,CFA4p2,CSP,CFA3S surveygroup_biascor_abortflag=0 @@ -71,4 +68,4 @@ u12=0 u13=0 h0=70.0 mag0=-30.00 -uave=1 \ No newline at end of file +uave=1 diff --git a/pippin/biascor.py b/pippin/biascor.py index 677acf4b..9e10ca26 100644 --- a/pippin/biascor.py +++ b/pippin/biascor.py @@ -7,7 +7,7 @@ from pippin.base import ConfigBasedExecutable from pippin.classifiers.classifier import Classifier -from pippin.config import chown_dir, mkdirs, get_config, ensure_list, get_data_loc, read_yaml, make_tarfile +from pippin.config import chown_dir, mkdirs, get_config, ensure_list, get_data_loc, read_yaml, compress_dir from pippin.merge import Merger from pippin.task import Task @@ -188,8 +188,7 @@ def submit_reject_phase(self): os.remove(tar_file) shutil.move(self.fit_output_dir, moved) - make_tarfile(tar_file, moved) - shutil.rmtree(moved) + compress_dir(tar_file, moved) command = ["submit_batch_jobs.sh", os.path.basename(self.config_filename)] self.logger.debug(f"Running command: {' '.join(command)}") diff --git a/pippin/config.py b/pippin/config.py index 2dff668f..9c7a58dd 100644 --- a/pippin/config.py +++ b/pippin/config.py @@ -9,10 +9,17 @@ import gzip -def make_tarfile(output_filename, source_dir): +def compress_dir(output_filename, source_dir): + logging.info(f"Compressing {source_dir} to {output_filename}") with tarfile.open(output_filename, "w:gz") as tar: tar.add(source_dir, arcname=os.path.basename(source_dir)) + shutil.rmtree(source_dir) +def uncompress_dir(output_dir, source_filename): + logging.info(f"Uncompressing {source_filename} to {output_dir}") + with tarfile.open(source_filename, "r:gz") as tar: + tar.extractall(path=output_dir) + os.remove(source_filename) def singleton(fn): instance = None diff --git a/pippin/dataprep.py b/pippin/dataprep.py index 5ab7fa7b..3c11077f 100644 --- a/pippin/dataprep.py +++ b/pippin/dataprep.py @@ -1,9 +1,9 @@ import shutil import subprocess +import tarfile import os from collections import OrderedDict from pathlib import Path - from pippin.config import mkdirs, get_output_loc, get_config, get_data_loc, read_yaml, merge_dict from pippin.task import Task diff --git a/pippin/manager.py b/pippin/manager.py index 5e76d5d2..99d26f71 100644 --- a/pippin/manager.py +++ b/pippin/manager.py @@ -183,6 +183,9 @@ def fail_task(self, t): if os.path.exists(t.hash_file): os.remove(t.hash_file) + if self.compress: + t.compress() + modified = True while modified: modified = False @@ -191,9 +194,12 @@ def fail_task(self, t): if d in self.failed or d in self.blocked: self.tasks.remove(t2) self.blocked.append(t2) + if self.compress: + t2.compress() modified = True break + def log_status(self): self.logger.debug("") self.logger.debug(f"Status as of {time.ctime()}:") @@ -266,11 +272,28 @@ def print_dashboard(self): self.logger.info("-------------------") - def execute(self, check_config): + def compress_all(self): + for t in self.tasks: + t.compress() + + def uncompress_all(self): + for t in self.tasks: + t.uncompress() + + def execute(self, check_config, compress_output, uncompress_output): self.logger.info(f"Executing pipeline for prefix {self.prefix}") self.logger.info(f"Output will be located in {self.output_dir}") if check_config: self.logger.info("Only verifying config, not launching anything") + assert not (compress_output and uncompress_output), "-C / --compress and -U / --uncompress are mutually exclusive" + # Whilst compressing is being debugged, false by default + self.compress = False + if compress_output: + self.compress = True + self.logger.info("Compressing output") + if uncompress_output: + self.compress = False + self.logger.info("Uncompressing output") mkdirs(self.output_dir) c = self.run_config @@ -281,7 +304,12 @@ def execute(self, check_config): self.num_jobs_queue_gpu = 0 squeue = None + if check_config: + if compress_output: + self.compress_all() + if uncompress_output: + self.uncompress_all() self.logger.notice("Config verified, exiting") return @@ -388,11 +416,14 @@ def check_task_completion(self, t, squeue): self.running.remove(t) self.logger.notice(f"FINISHED: {t} with {t.num_jobs} NUM_JOBS. NUM_JOBS now {self.num_jobs_queue}") self.done.append(t) - self.logger.debug("Compressing task") - t.compress_task() + if self.compress: + t.compress() else: self.fail_task(t) - chown_dir(t.output_dir) + if os.path.exists(t.output_dir): + chown_dir(t.output_dir) + else: + chown_file(t.output_dir + ".tar.gz") return True return False diff --git a/pippin/task.py b/pippin/task.py index 121af88f..45080af9 100644 --- a/pippin/task.py +++ b/pippin/task.py @@ -1,7 +1,8 @@ import logging import shutil from abc import ABC, abstractmethod -from pippin.config import get_logger, get_hash, ensure_list, get_data_loc, read_yaml +from pippin.config import get_logger, get_hash, ensure_list, get_data_loc, read_yaml, compress_dir, uncompress_dir +import tarfile import os import datetime import numpy as np @@ -55,16 +56,39 @@ def __init__(self, name, output_dir, dependencies=None, config=None, done_file=" if self.external is not None: self.logger.debug(f"External config stated to be {self.external}") self.external = get_data_loc(self.external) - if os.path.isdir(self.external): - self.external = os.path.join(self.external, "config.yml") - self.logger.debug(f"External config file path resolved to {self.external}") - with open(self.external, "r") as f: - external_config = yaml.load(f, Loader=yaml.Loader) - conf = external_config.get("CONFIG", {}) - conf.update(self.config) - self.config = conf - self.output = external_config.get("OUTPUT", {}) - self.logger.debug("Loaded external config successfully") + # External directory might be compressed + if not os.path.exists(self.external): + self.logger.warning(f"External config {self.external} does not exist, checking if it's compressed") + compressed_dir = self.external + ".tar.gz" + if not os.path.exists(compressed_dir): + self.logger.error(f"{self.external} and {compressed_dir} do not exist") + else: + self.external = compressed_dir + self.logger.debug(f"External config file path resolved to {self.external}") + with tarfile.open(self.external, "r:gz") as tar: + for member in tar: + if member.isfile(): + filename = os.path.basename(member.name) + if filename != "config.yml": + continue + with tar.extractfile(member) as f: + external_config = yaml.load(f, Loader=yaml.Loader) + conf = external_config.get("CONFIG", {}) + conf.update(self.config) + self.config = conf + self.output = external_config.get("OUTPUT", {}) + self.logger.debug("Loaded external config successfully") + else: + if os.path.isdir(self.external): + self.external = os.path.join(self.external, "config.yml") + self.logger.debug(f"External config file path resolved to {self.external}") + with open(self.external, "r") as f: + external_config = yaml.load(f, Loader=yaml.Loader) + conf = external_config.get("CONFIG", {}) + conf.update(self.config) + self.config = conf + self.output = external_config.get("OUTPUT", {}) + self.logger.debug("Loaded external config successfully") self.hash = None self.hash_file = os.path.join(self.output_dir, "hash.txt") @@ -126,8 +150,23 @@ def clean_header(self, header): header = '\n'.join(lines) return header - def compress_task(self): - pass + def compress(self): + if os.path.exists(self.output_dir): + output_file = self.output_dir + ".tar.gz" + compress_dir(output_file, self.output_dir) + for t in self.dependencies: + if os.path.exists(t.output_dir): + output_file = t.output_dir + ".tar.gz" + compress_dir(output_file, t.output_dir) + + def uncompress(self): + source_file = self.output_dir + ".tar.gz" + if os.path.exists(source_file): + uncompress_dir(os.path.dirname(self.output_dir), source_file) + for t in self.dependencies: + source_file = t.output_dir + ".tar.gz" + if os.path.exists(source_file): + uncompress_dir(os.path.dirname(t.output_dir), source_file) def _check_regenerate(self, new_hash): hash_are_different = new_hash != self.get_old_hash() @@ -231,6 +270,7 @@ def add_dependency(self, task): self.dependencies.append(task) def run(self): + self.uncompress() if self.external is not None: self.logger.debug(f"Name: {self.name} External: {self.external}") if os.path.exists(self.output_dir) and not self.force_refresh: @@ -239,8 +279,16 @@ def run(self): if os.path.exists(self.output_dir): self.logger.debug(f"Removing old directory {self.output_dir}") shutil.rmtree(self.output_dir, ignore_errors=True) - self.logger.info(f"Copying from {os.path.dirname(self.external)} to {self.output_dir}") - shutil.copytree(os.path.dirname(self.external), self.output_dir, symlinks=True) + if ".tar.gz" in self.external: + tardir = os.path.basename(self.external).replace(".tar.gz", "") + self.logger.info(f"Copying files from {self.external} to {self.output_dir}") + + shutil.copyfile(self.external, self.output_dir + '.tar.gz') + self.uncompress() + shutil.move(os.path.join(os.path.dirname(self.output_dir), tardir), self.output_dir) + else: + self.logger.info(f"Copying from {os.path.dirname(self.external)} to {self.output_dir}") + shutil.copytree(os.path.dirname(self.external), self.output_dir, symlinks=True) return True return self._run() diff --git a/run.py b/run.py index 748b8da6..1ee859f4 100644 --- a/run.py +++ b/run.py @@ -178,7 +178,7 @@ def handler(signum, frame): manager.set_finish(args.finish) manager.set_force_refresh(args.refresh) manager.set_force_ignore_stage(args.ignore) - manager.execute(args.check) + manager.execute(args.check, args.compress, args.uncompress) chown_file(logging_filename) return manager @@ -235,6 +235,9 @@ def get_args(test=False): parser.add_argument("-p", "--permission", help="Fix permissions and groups on all output, don't rerun", action="store_true", default=False) parser.add_argument("-i", "--ignore", help="Dont rerun tasks with this stage or less. Accepts either the stage number of name (i.e. 1 or SIM)", default=None) parser.add_argument("-S", "--syntax", help="Get the syntax of the given stage. Accepts either the stage number or name (i.e. 1 or SIM). If run without argument, will tell you all stage numbers / names.", default=None, const="options", type=str, nargs='?') + command_group = parser.add_mutually_exclusive_group() + command_group.add_argument("-C", "--compress", help="Compress pippin output during job. Combine with -c / --check in order to compress completed pippin job.", action="store_true", default=False) + command_group.add_argument("-U", "--uncompress", help="Do not compress pippin output during job. Combine with -c / --check in order to uncompress completed pippin job. Mutually exclusive with -C / --compress", action="store_true", default=False) args = parser.parse_args() if args.syntax is not None: