Skip to content

Commit

Permalink
Added compressing and uncompression functionality. Disabled by defaul…
Browse files Browse the repository at this point in the history
…t during trial period. #50
  • Loading branch information
OmegaLambda1998 committed Jul 9, 2022
1 parent d3b6241 commit 7ac287d
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 29 deletions.
5 changes: 1 addition & 4 deletions data_files/surveys/des/bbc/bbc_3yr.input
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ CONFIG:

#END_YAML

varname_pIa=NN_PROB_IA
simfile_biascor=something.FITRES
simfile_ccprior=something.FITRES
surveygroup_biascor='CFA3K+CFA4p1+CFA4p2+CSP(zbin=0.02),DES(zbin=0.05)'
idsurvey_list_probcc0=CFA3K,CFA4p1,CFA4p2,CSP,CFA3S
surveygroup_biascor_abortflag=0
Expand Down Expand Up @@ -71,4 +68,4 @@ u12=0
u13=0
h0=70.0
mag0=-30.00
uave=1
uave=1
5 changes: 2 additions & 3 deletions pippin/biascor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pippin.base import ConfigBasedExecutable
from pippin.classifiers.classifier import Classifier
from pippin.config import chown_dir, mkdirs, get_config, ensure_list, get_data_loc, read_yaml, make_tarfile
from pippin.config import chown_dir, mkdirs, get_config, ensure_list, get_data_loc, read_yaml, compress_dir
from pippin.merge import Merger
from pippin.task import Task

Expand Down Expand Up @@ -188,8 +188,7 @@ def submit_reject_phase(self):
os.remove(tar_file)

shutil.move(self.fit_output_dir, moved)
make_tarfile(tar_file, moved)
shutil.rmtree(moved)
compress_dir(tar_file, moved)

command = ["submit_batch_jobs.sh", os.path.basename(self.config_filename)]
self.logger.debug(f"Running command: {' '.join(command)}")
Expand Down
9 changes: 8 additions & 1 deletion pippin/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@
import gzip


def make_tarfile(output_filename, source_dir):
def compress_dir(output_filename, source_dir):
logging.info(f"Compressing {source_dir} to {output_filename}")
with tarfile.open(output_filename, "w:gz") as tar:
tar.add(source_dir, arcname=os.path.basename(source_dir))
shutil.rmtree(source_dir)

def uncompress_dir(output_dir, source_filename):
logging.info(f"Uncompressing {source_filename} to {output_dir}")
with tarfile.open(source_filename, "r:gz") as tar:
tar.extractall(path=output_dir)
os.remove(source_filename)

def singleton(fn):
instance = None
Expand Down
2 changes: 1 addition & 1 deletion pippin/dataprep.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import shutil
import subprocess
import tarfile
import os
from collections import OrderedDict
from pathlib import Path

from pippin.config import mkdirs, get_output_loc, get_config, get_data_loc, read_yaml, merge_dict
from pippin.task import Task

Expand Down
39 changes: 35 additions & 4 deletions pippin/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ def fail_task(self, t):
if os.path.exists(t.hash_file):
os.remove(t.hash_file)

if self.compress:
t.compress()

modified = True
while modified:
modified = False
Expand All @@ -191,9 +194,12 @@ def fail_task(self, t):
if d in self.failed or d in self.blocked:
self.tasks.remove(t2)
self.blocked.append(t2)
if self.compress:
t2.compress()
modified = True
break


def log_status(self):
self.logger.debug("")
self.logger.debug(f"Status as of {time.ctime()}:")
Expand Down Expand Up @@ -266,11 +272,28 @@ def print_dashboard(self):

self.logger.info("-------------------")

def execute(self, check_config):
def compress_all(self):
for t in self.tasks:
t.compress()

def uncompress_all(self):
for t in self.tasks:
t.uncompress()

def execute(self, check_config, compress_output, uncompress_output):
self.logger.info(f"Executing pipeline for prefix {self.prefix}")
self.logger.info(f"Output will be located in {self.output_dir}")
if check_config:
self.logger.info("Only verifying config, not launching anything")
assert not (compress_output and uncompress_output), "-C / --compress and -U / --uncompress are mutually exclusive"
# Whilst compressing is being debugged, false by default
self.compress = False
if compress_output:
self.compress = True
self.logger.info("Compressing output")
if uncompress_output:
self.compress = False
self.logger.info("Uncompressing output")

mkdirs(self.output_dir)
c = self.run_config
Expand All @@ -281,7 +304,12 @@ def execute(self, check_config):
self.num_jobs_queue_gpu = 0
squeue = None


if check_config:
if compress_output:
self.compress_all()
if uncompress_output:
self.uncompress_all()
self.logger.notice("Config verified, exiting")
return

Expand Down Expand Up @@ -388,11 +416,14 @@ def check_task_completion(self, t, squeue):
self.running.remove(t)
self.logger.notice(f"FINISHED: {t} with {t.num_jobs} NUM_JOBS. NUM_JOBS now {self.num_jobs_queue}")
self.done.append(t)
self.logger.debug("Compressing task")
t.compress_task()
if self.compress:
t.compress()
else:
self.fail_task(t)
chown_dir(t.output_dir)
if os.path.exists(t.output_dir):
chown_dir(t.output_dir)
else:
chown_file(t.output_dir + ".tar.gz")
return True
return False

Expand Down
78 changes: 63 additions & 15 deletions pippin/task.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import shutil
from abc import ABC, abstractmethod
from pippin.config import get_logger, get_hash, ensure_list, get_data_loc, read_yaml
from pippin.config import get_logger, get_hash, ensure_list, get_data_loc, read_yaml, compress_dir, uncompress_dir
import tarfile
import os
import datetime
import numpy as np
Expand Down Expand Up @@ -55,16 +56,39 @@ def __init__(self, name, output_dir, dependencies=None, config=None, done_file="
if self.external is not None:
self.logger.debug(f"External config stated to be {self.external}")
self.external = get_data_loc(self.external)
if os.path.isdir(self.external):
self.external = os.path.join(self.external, "config.yml")
self.logger.debug(f"External config file path resolved to {self.external}")
with open(self.external, "r") as f:
external_config = yaml.load(f, Loader=yaml.Loader)
conf = external_config.get("CONFIG", {})
conf.update(self.config)
self.config = conf
self.output = external_config.get("OUTPUT", {})
self.logger.debug("Loaded external config successfully")
# External directory might be compressed
if not os.path.exists(self.external):
self.logger.warning(f"External config {self.external} does not exist, checking if it's compressed")
compressed_dir = self.external + ".tar.gz"
if not os.path.exists(compressed_dir):
self.logger.error(f"{self.external} and {compressed_dir} do not exist")
else:
self.external = compressed_dir
self.logger.debug(f"External config file path resolved to {self.external}")
with tarfile.open(self.external, "r:gz") as tar:
for member in tar:
if member.isfile():
filename = os.path.basename(member.name)
if filename != "config.yml":
continue
with tar.extractfile(member) as f:
external_config = yaml.load(f, Loader=yaml.Loader)
conf = external_config.get("CONFIG", {})
conf.update(self.config)
self.config = conf
self.output = external_config.get("OUTPUT", {})
self.logger.debug("Loaded external config successfully")
else:
if os.path.isdir(self.external):
self.external = os.path.join(self.external, "config.yml")
self.logger.debug(f"External config file path resolved to {self.external}")
with open(self.external, "r") as f:
external_config = yaml.load(f, Loader=yaml.Loader)
conf = external_config.get("CONFIG", {})
conf.update(self.config)
self.config = conf
self.output = external_config.get("OUTPUT", {})
self.logger.debug("Loaded external config successfully")

self.hash = None
self.hash_file = os.path.join(self.output_dir, "hash.txt")
Expand Down Expand Up @@ -126,8 +150,23 @@ def clean_header(self, header):
header = '\n'.join(lines)
return header

def compress_task(self):
pass
def compress(self):
if os.path.exists(self.output_dir):
output_file = self.output_dir + ".tar.gz"
compress_dir(output_file, self.output_dir)
for t in self.dependencies:
if os.path.exists(t.output_dir):
output_file = t.output_dir + ".tar.gz"
compress_dir(output_file, t.output_dir)

def uncompress(self):
source_file = self.output_dir + ".tar.gz"
if os.path.exists(source_file):
uncompress_dir(os.path.dirname(self.output_dir), source_file)
for t in self.dependencies:
source_file = t.output_dir + ".tar.gz"
if os.path.exists(source_file):
uncompress_dir(os.path.dirname(t.output_dir), source_file)

def _check_regenerate(self, new_hash):
hash_are_different = new_hash != self.get_old_hash()
Expand Down Expand Up @@ -231,6 +270,7 @@ def add_dependency(self, task):
self.dependencies.append(task)

def run(self):
self.uncompress()
if self.external is not None:
self.logger.debug(f"Name: {self.name} External: {self.external}")
if os.path.exists(self.output_dir) and not self.force_refresh:
Expand All @@ -239,8 +279,16 @@ def run(self):
if os.path.exists(self.output_dir):
self.logger.debug(f"Removing old directory {self.output_dir}")
shutil.rmtree(self.output_dir, ignore_errors=True)
self.logger.info(f"Copying from {os.path.dirname(self.external)} to {self.output_dir}")
shutil.copytree(os.path.dirname(self.external), self.output_dir, symlinks=True)
if ".tar.gz" in self.external:
tardir = os.path.basename(self.external).replace(".tar.gz", "")
self.logger.info(f"Copying files from {self.external} to {self.output_dir}")

shutil.copyfile(self.external, self.output_dir + '.tar.gz')
self.uncompress()
shutil.move(os.path.join(os.path.dirname(self.output_dir), tardir), self.output_dir)
else:
self.logger.info(f"Copying from {os.path.dirname(self.external)} to {self.output_dir}")
shutil.copytree(os.path.dirname(self.external), self.output_dir, symlinks=True)
return True

return self._run()
Expand Down
5 changes: 4 additions & 1 deletion run.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def handler(signum, frame):
manager.set_finish(args.finish)
manager.set_force_refresh(args.refresh)
manager.set_force_ignore_stage(args.ignore)
manager.execute(args.check)
manager.execute(args.check, args.compress, args.uncompress)
chown_file(logging_filename)
return manager

Expand Down Expand Up @@ -235,6 +235,9 @@ def get_args(test=False):
parser.add_argument("-p", "--permission", help="Fix permissions and groups on all output, don't rerun", action="store_true", default=False)
parser.add_argument("-i", "--ignore", help="Dont rerun tasks with this stage or less. Accepts either the stage number of name (i.e. 1 or SIM)", default=None)
parser.add_argument("-S", "--syntax", help="Get the syntax of the given stage. Accepts either the stage number or name (i.e. 1 or SIM). If run without argument, will tell you all stage numbers / names.", default=None, const="options", type=str, nargs='?')
command_group = parser.add_mutually_exclusive_group()
command_group.add_argument("-C", "--compress", help="Compress pippin output during job. Combine with -c / --check in order to compress completed pippin job.", action="store_true", default=False)
command_group.add_argument("-U", "--uncompress", help="Do not compress pippin output during job. Combine with -c / --check in order to uncompress completed pippin job. Mutually exclusive with -C / --compress", action="store_true", default=False)
args = parser.parse_args()

if args.syntax is not None:
Expand Down

0 comments on commit 7ac287d

Please sign in to comment.