Skip to content

Commit

Permalink
Merge pull request #13 from ZihengSun/main
Browse files Browse the repository at this point in the history
sync
  • Loading branch information
ZihengSun authored Jun 10, 2023
2 parents 9134c4c + 5cc3b75 commit ab74bc2
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pygeoweaver/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from pygeoweaver import detail_host, detail_process, detail_workflow, export_workflow, \
show_history, import_workflow, list_hosts, list_processes, list_workflows, \
start, stop, reset_password, run_process, run_worklfow, helpwith
start, stop, reset_password, run_process, run_workflow, helpwith
from pygeoweaver.server import show


Expand Down
1 change: 1 addition & 0 deletions pygeoweaver/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@

GEOWEAVER_DEFAULT_ENDPOINT_URL="http://localhost:8070/Geoweaver"
COMMON_API_HEADER = {'Content-Type': 'application/json'}
8 changes: 4 additions & 4 deletions pygeoweaver/jdk_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ def install_jdk_linux(jdk_version, jdk_arch):
jdk_install_dir = os.path.expanduser('~/jdk')

# Download JDK archive
download_file(jdk_url, 'jdk.tar.gz')
download_file(jdk_url, f'{get_home_dir()}/jdk.tar.gz')

# Extract JDK archive
extract_tar_archive('jdk.tar.gz', jdk_install_dir)
extract_tar_archive(f'{get_home_dir()}/jdk.tar.gz', jdk_install_dir)

# Set JDK environment variables
set_jdk_env_vars(f'{jdk_install_dir}/jdk-{jdk_version.replace("-", "+")}')
Expand All @@ -76,10 +76,10 @@ def install_jdk_windows(jdk_version, jdk_arch):
jdk_install_dir = os.path.expanduser('~/jdk')

# Download JDK archive
download_file(jdk_url, 'jdk.zip')
download_file(jdk_url, f'{get_home_dir()}/jdk.zip')

# Extract JDK archive
extract_zip_archive('jdk.zip', jdk_install_dir)
extract_zip_archive(f'{get_home_dir()}/jdk.zip', jdk_install_dir)

# Set JDK environment variables
set_jdk_env_vars(f'{jdk_install_dir}/jdk-{jdk_version.replace("-", "+")}')
Expand Down
132 changes: 132 additions & 0 deletions pygeoweaver/sc_create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import json
import requests
import pandas as pd
from pydantic import BaseModel

from . import constants
from pygeoweaver.utils import download_geoweaver_jar, get_geoweaver_jar_path, get_java_bin_path, get_root_dir, \
check_ipython


class ProcessData(BaseModel):
type: str = "process"
lang: str
description: str
name: str
code: str
owner: str = "111111"
confidential: bool = False


class WorkflowData(BaseModel):
type: str = "workflow"
confidential: bool = False
description: str
edges: str
name: str
nodes: str
owner: str = "111111"


def create_process(lang, description, name, code, owner="111111", confidential=False):
"""
Function to create a process with given data if valid.
:param lang: The programming language of the process
:type lang: str
:param description: The description of the process
:type description: str
:param name: The name of the process
:type name: str
:param code: The code of the process
:type code: str
:param owner: The owner of the process, defaults to "111111"
:type owner: str, optional
:param confidential: The confidentiality status of the process, defaults to False
:type confidential: bool, optional
:return: Returns the id of the created process
:rtype: dict
"""
download_geoweaver_jar()
process = ProcessData(
type="process",
lang=lang,
description=description,
name=name,
code=code,
owner=owner,
confidential=confidential
)
data_json = process.json()
r = requests.post(
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/add/process",
data=data_json,
headers=constants.COMMON_API_HEADER
)
if check_ipython() and r.ok:
df = pd.DataFrame(json.loads(data_json).items(), columns=['Key', 'Value'])
return df
else:
return r.json()


def create_process_from_file(lang, description, name, file_path, owner="111111", confidential=False):
"""
Function to create a process with code from a file.
:param lang: The programming language of the process.
:type lang: str
:param description: The description of the process.
:type description: str
:param name: The name of the process.
:type name: str
:param file_path: The path to the file containing the code.
:type file_path: str
:param owner: The owner of the process, defaults to "111111".
:type owner: str, optional
:param confidential: The confidentiality status of the process, defaults to False.
:type confidential: bool, optional
:return: Returns the id of the created process.
:rtype: dict
"""
with open(file_path, 'r') as file:
code = file.read()
return create_process(lang, description, name, code, owner=owner, confidential=confidential)


def create_workflow(description, edges, name, nodes, owner="111111", confidential=False):
"""
Function to create a workflow with given data if valid
:param confidential: The confidentiality status of the workflow, defaults to False
:type confidential: bool, optional
:param description: The description of the workflow
:type description: str
:param edges: The edges of the workflow
:type edges: str
:param name: The name of the workflow
:type name: str
:param nodes: The nodes of the workflow
:type nodes: str
:param owner: The owner of the workflow, defaults to "111111"
:type owner: str, optional
:return: Returns the id of the created workflow
:rtype: dict
"""
download_geoweaver_jar()
workflow = WorkflowData(
type="workflow",
confidential=confidential,
description=description,
edges=edges,
name=name,
nodes=nodes,
owner=owner
)
data_json = workflow.json()
r = requests.post(
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/add/workflow",
data=data_json,
headers=constants.COMMON_API_HEADER
)
if check_ipython():
return pd.DataFrame(json.loads(data_json).items(), columns=['Key', 'Value'])
else:
return r.json()
41 changes: 40 additions & 1 deletion pygeoweaver/sc_history.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import subprocess

import pandas as pd
import requests

from . import constants
from pygeoweaver.utils import download_geoweaver_jar, get_geoweaver_jar_path, get_java_bin_path, get_root_dir


Expand All @@ -9,4 +14,38 @@ def show_history(history_id):
if not history_id:
raise RuntimeError("history id is missing")
download_geoweaver_jar()
subprocess.run([get_java_bin_path(), "-jar", get_geoweaver_jar_path(), "history", history_id], cwd=f"{get_root_dir()}/")
subprocess.run([get_java_bin_path(), "-jar", get_geoweaver_jar_path(), "history", history_id],
cwd=f"{get_root_dir()}/")


def get_process_history(process_id):
"""
Get list of history for a process using process id
:param process_id: str :type process_id: str
"""
if not process_id:
raise Exception("please pass `process_id` as a parameter to the function.")
download_geoweaver_jar()
try:
r = requests.post(f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/logs",
data={'type': 'process', 'id': process_id}).json()
return pd.DataFrame(r)
except Exception as e:
subprocess.run(f"{get_java_bin_path()} -jar {get_geoweaver_jar_path()} process-history {process_id}",
cwd=f"{get_root_dir()}/", shell=True)


def get_workflow_history(workflow_id):
"""
Get list of history for a workflow using workflow id
:param workflow_id: str
"""
if not workflow_id:
raise Exception("please pass `workflow_id` as a parameter to the function.")
try:
r = requests.post(f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/logs",
data={'type': 'workflow', 'id': workflow_id}).json()
return pd.DataFrame(r)
except Exception as e:
subprocess.run(f"{get_java_bin_path()} -jar {get_geoweaver_jar_path()} workflow-history {workflow_id}",
shell=True, cwd=f"{get_root_dir()}/")
2 changes: 2 additions & 0 deletions pygeoweaver/sc_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@
from pygeoweaver.server import *
from pygeoweaver.sc_resetpassword import *
from pygeoweaver.sc_help import *
from pygeoweaver.sc_create import *
from pygeoweaver.sc_sync import *
98 changes: 75 additions & 23 deletions pygeoweaver/sc_run.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
import json
import os
import subprocess

import requests

from . import constants
from pygeoweaver.utils import download_geoweaver_jar, get_geoweaver_jar_path, get_java_bin_path, get_root_dir


def run_process(*, process_id: str, host_id: str, password: str, environment: str=None):
def create_process():
pass


def create_workflow():
"""
Create workflow from workflow.json
:return:
:rtype:
"""
pass


def run_process(*, process_id: str, host_id: str, password: str, environment: str = None,
sync_path: os.PathLike = None):
"""
Run a process
Expand All @@ -11,13 +31,44 @@ def run_process(*, process_id: str, host_id: str, password: str, environment: st
password - required
environment - optional
"""
if sync_path:
ext, matching_dict = None, None
process_file = os.path.exists(os.path.join(sync_path, 'code', 'process.json'))
if not process_file:
print("process file does not exists, please check the path")
return
p_file = json.loads(open(os.path.join(sync_path, 'code', 'process.json'), "r").read())
for item in p_file:
if item.get("id") == process_id:
matching_dict = item
break
if not matching_dict:
print("Could not find the file, please check the path")
return
if matching_dict['lang'] == "python":
ext = ".py"
if matching_dict['lang'] == "bash":
ext = ".bash"
if not ext:
print("Invalid file format.")
source_filename = matching_dict['name'] + ext
source_file_exists = os.path.exists(os.path.join(sync_path, 'code', source_filename))
if source_file_exists:
f = open(os.path.join(sync_path, 'code', source_filename), "r").read()
matching_dict['code'] = f
requests.post(f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/edit/process",
data=json.dumps(matching_dict), headers={'Content-Type': 'application/json'})
else:
print("File does not exists")
download_geoweaver_jar()
subprocess.run([get_java_bin_path(), "-jar", get_geoweaver_jar_path(), "run", "process", f"--host={host_id}",
f"--password={password}", f"--environment={environment}", process_id],
cwd=f"{get_root_dir()}/")

def run_worklfow(*, workflow_id: str, workflow_folder_path: str=None, workflow_zip_file_path: str=None,
environment_list: str=None, host_list: str, password_list: str):

def run_workflow(*, workflow_id: str, workflow_folder_path: str = None, workflow_zip_file_path: str = None,
environment_list: str = None, host_list: str = None, password_list: str = None,
sync_path: os.PathLike = None):
"""
Usage: <main class> run workflow [-d=<workflowFolderPath>]
[-f=<workflowZipPath>] [-e=<envs>]...
Expand All @@ -34,31 +85,32 @@ def run_worklfow(*, workflow_id: str, workflow_folder_path: str=None, workflow_z
"""
download_geoweaver_jar()

if sync_path:
from . import sync_workflow
sync_workflow(workflow_id=workflow_id, sync_to_path=sync_path)

if not workflow_id and not workflow_folder_path and not workflow_zip_file_path:
raise RuntimeError("Please provide at least one of the three options: workflow id, " \
raise RuntimeError("Please provide at least one of the three options: workflow id, "
"folder path or zip path")

if workflow_id and not workflow_folder_path and not workflow_zip_file_path:
subprocess.run([get_java_bin_path(), "-jar", get_geoweaver_jar_path(), "run", "workflow", workflow_id,
"-e", environment_list,
"-h", host_list,
"-p", password_list],
cwd=f"{get_root_dir()}/")
command = [get_java_bin_path(), "-jar", get_geoweaver_jar_path(), "run", "workflow", workflow_id]
if environment_list:
command.extend(["-e", environment_list])
command.extend(["-h", host_list, "-p", password_list])
subprocess.run(command, cwd=f"{get_root_dir()}/")

if workflow_folder_path and not workflow_zip_file_path:
# command to run workflow from folder
subprocess.run([get_java_bin_path(), "-jar", get_geoweaver_jar_path(), "run", "workflow", workflow_id,
"-d", workflow_folder_path,
"-e", environment_list,
"-h", host_list,
"-p", password_list],
cwd=f"{get_root_dir()}/")
command = [get_java_bin_path(), "-jar", get_geoweaver_jar_path(), "run", "workflow", workflow_id]
if environment_list:
command.extend(["-e", environment_list])
command.extend(["-d", workflow_folder_path, "-h", host_list, "-p", password_list])
subprocess.run(command, cwd=f"{get_root_dir()}/")

if not workflow_folder_path and workflow_zip_file_path:
subprocess.run([get_java_bin_path(), "-jar", get_geoweaver_jar_path(), "run", "workflow", workflow_id,
"-e", environment_list,
"-f", workflow_zip_file_path,
"-h", host_list,
"-p", password_list],
cwd=f"{get_root_dir()}/")

command = [get_java_bin_path(), "-jar", get_geoweaver_jar_path(), "run", "workflow", workflow_id]
if environment_list:
command.extend(["-e", environment_list])
command.extend(["-f", workflow_zip_file_path, "-h", host_list, "-p", password_list])
subprocess.run(command, cwd=f"{get_root_dir()}/")
38 changes: 38 additions & 0 deletions pygeoweaver/sc_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import json
import zipfile

import requests
import typing

from . import constants
from pygeoweaver.utils import download_geoweaver_jar, get_geoweaver_jar_path, get_java_bin_path, get_root_dir, \
copy_files


def sync_workflow(workflow_id: str, sync_to_path: typing.Union[str, os.PathLike]):
download_geoweaver_jar()
# download workflow
r = requests.post(f'{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/downloadworkflow',
data={'id': workflow_id, 'option': 'workflowwithprocesscodeallhistory'}).text
filename = r.rsplit('/')[-1]
home_dir = os.path.expanduser("~")
tmp_dir = os.path.join(home_dir, 'tmp')
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
# unzip the workflow
with zipfile.ZipFile(os.path.join(home_dir, 'gw-workspace', 'temp', filename)) as ref:
ref.extractall(os.path.join(home_dir, 'tmp'))
# check if target workflow path and the unzipped workflow match
if not sync_to_path:
raise Exception("Please provide path to workflow that you wish to sync code and history")
import_id = json.loads(open(os.path.join(home_dir, 'tmp', 'workflow.json'), "r").read()).get("id")
sync_id = json.loads(open(os.path.join(sync_to_path, "workflow.json"), "r").read()).get("id")

if import_id == sync_id:
# if they match perform file replace
copy_files(source_folder=os.path.join(home_dir, 'tmp'), destination_folder=sync_to_path)
else:
print("Workflow ID mismatch, please check the `sync_to_path` path.")


Loading

0 comments on commit ab74bc2

Please sign in to comment.