Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to archive the workdir #980

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ def submit(wf_name: str = typer.Argument(..., help='the workflow name'), # pyli
workdir: pathlib.Path = typer.Argument(...,
help='working directory for workflow containing input + output files',),
no_start: bool = typer.Option(False, '--no-start', '-n',
help='do not start the workflow')):
help='do not start the workflow'),
archive_workdir: bool = typer.Option(False, '--archive-workdir', '-a',
help='archive a copy of the workdir')):
"""Submit a new workflow."""
def is_parent(parent, path):
"""Return true if the path is a child of the other path."""
Expand Down Expand Up @@ -312,6 +314,51 @@ def is_parent(parent, path):
if os.path.commonpath([os.path.realpath('/var/tmp'), workdir]) == os.path.realpath('/var/tmp'):
error_exit("Workflow working directory cannot be in \"/var/tmp\"")

# TODO: this should be in a function somewhere
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to find some kind of python package that would do this logic, but didn't come up with anything.

I'm not sure the best place to put this function. It should likely also have some tests.

total_size = 0
size_limit_mb = 1000
total_size_limit = size_limit_mb * 1024 ** 2
n_files = 0
n_files_limit = 10_000
n_dirs = 0
n_dirs_limit = 1000
reason = ''
# check the workdir isn't too large/contains too many things
for dirpath, dirnames, filenames in os.walk(workdir):
n_files += len(filenames)
if n_files > n_files_limit:
reason = f'Directory contains more than {n_files_limit} files'
break
n_dirs += len(dirnames)
if n_dirs > n_dirs_limit:
reason = f'Directory contains more than {n_dirs_limit} subdirectories'
break
for fname in filenames:
fp = os.path.join(dirpath, fname)
# skip if it is symbolic link
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
if total_size > total_size_limit:
reason = f'Total file size of directory is greater than {size_limit_mb}MB'
break
else:
continue
break

if len(reason) > 0 and archive_workdir:
archive_workdir = False
ans = input(f"""
******************
** WARNING **
******************
Are you sure you want to archive the workdir at:

{workdir}

{reason} [y/n]? """)
if ans.lower() in ("y", "yes"):
archive_workdir = True

# TODO: Can all of this information be sent as a file?
data = {
'wf_name': wf_name.encode(),
Expand All @@ -320,6 +367,7 @@ def is_parent(parent, path):
'workflow': jsonpickle.encode(workflow),
'tasks': jsonpickle.encode(tasks, warn=True),
'no_start': no_start,
'archive_workdir': archive_workdir,
}
files = {
'workflow_archive': wf_tarball
Expand Down
3 changes: 2 additions & 1 deletion beeflow/common/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def run(self):
bee_client.package(Path(self.path), Path(tarball_dir))
print('Submitting and starting workflow')
self.wf_id = bee_client.submit(self.name, self.tarball, self.main_cwl,
self.job_file, self.workdir, no_start=False)
self.job_file, self.workdir, no_start=False,
archive_workdir=False)
except bee_client.ClientError as error:
raise CIError(*error.args) from error

Expand Down
3 changes: 2 additions & 1 deletion beeflow/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def submit_new_wf_long(wf_name: str, tarball_name: str, main_cwl_file: str, job_
main_cwl_file,
job_file,
workdir_path,
no_start=False)
no_start=False,
archive_workdir=False)
output["result"] = "Submitted new workflow" + str(wf_name)
return output
except bee_client.ClientError as error:
Expand Down
9 changes: 6 additions & 3 deletions beeflow/wf_manager/resources/wf_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ def extract_wf(wf_id, filename, workflow_archive):

@shared_task(ignore_result=True)
def init_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
tasks=None):
tasks=None, archive_workdir=False):
"""Initialize the workflow in a separate process."""
db = connect_db(wfm_db, db_path)
wf_utils.connect_neo4j_driver(db.info.get_port('bolt'))
wf_utils.setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start,
workflow, tasks)
workflow, tasks, archive_workdir)


db_path = wf_utils.get_db_path()
Expand Down Expand Up @@ -93,6 +93,7 @@ def post(self):
reqparser.add_argument('tasks', type=str, required=True,
location='form')
reqparser.add_argument('no_start', type=str, required=True, location='form')
reqparser.add_argument('archive_workdir', type=str, required=True, location='form')
reqparser.add_argument('workflow_archive', type=FileStorage, required=False,
location='files')
data = reqparser.parse_args()
Expand All @@ -102,6 +103,7 @@ def post(self):
wf_workdir = data['workdir']
# Note we have to check for the 'true' string value
no_start = data['no_start'].lower() == 'true'
archive_workdir = data['archive_workdir'].lower() == 'true'
workflow = jsonpickle.decode(data['workflow'])
# May have to decode the list and task objects separately
tasks = [jsonpickle.decode(task) if isinstance(task, str) else task
Expand All @@ -113,7 +115,8 @@ def post(self):
db.workflows.init_workflow(wf_id, wf_name, wf_dir)

init_workflow.delay(wf_id, wf_name, wf_dir, wf_workdir,
no_start, workflow=workflow, tasks=tasks)
no_start, workflow=workflow, tasks=tasks,
archive_workdir=archive_workdir)

return make_response(jsonify(msg='Workflow uploaded', status='ok',
wf_id=wf_id), 201)
Expand Down
10 changes: 10 additions & 0 deletions beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ def archive_workflow(db, wf_id, final_state=None):
os.makedirs(dags_dir, exist_ok=True)
wf_utils.export_dag(wf_id, dags_dir, graphmls_dir, no_dag_dir=True, copy_dag_in_archive=False)

# Archive workdir
wfi = wf_utils.get_workflow_interface(wf_id)
_, tasks = wfi.get_workflow()
# metadata related to the workdir is redundantly stored on each task
archive_workdir = wfi.get_task_metadata(tasks[0])['archive_workdir']
if archive_workdir:
workdir = wfi.get_task_metadata(tasks[0])['workdir']
workdir_dir = workflow_dir + "/workdir"
shutil.copytree(workdir, workdir_dir, dirs_exist_ok=True)

wf_state = f'Archived/{final_state}' if final_state is not None else 'Archived'
db.workflows.update_workflow_state(wf_id, wf_state)
wf_utils.update_wf_status(wf_id, wf_state)
Expand Down
3 changes: 2 additions & 1 deletion beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def connect_neo4j_driver(bolt_port):


def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
tasks=None):
tasks=None, archive_workdir=False):
"""Initialize Workflow in Separate Process."""
wfi = get_workflow_interface(wf_id)
wfi.initialize_workflow(workflow)
Expand All @@ -279,6 +279,7 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
wfi.add_task(task, task_state)
metadata = wfi.get_task_metadata(task)
metadata['workdir'] = wf_workdir
metadata['archive_workdir'] = archive_workdir
wfi.set_task_metadata(task, metadata)
db.workflows.add_task(task.id, wf_id, task.name, task_state)

Expand Down
Loading