Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
ZihengSun committed Jun 14, 2023
2 parents 2fde1cd + af40796 commit 9d82691
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 60 deletions.
12 changes: 12 additions & 0 deletions pygeoweaver/sc_detail.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"""

import subprocess

import requests
from . import constants

from pygeoweaver.utils import (
download_geoweaver_jar,
get_geoweaver_jar_path,
Expand Down Expand Up @@ -57,3 +61,11 @@ def detail_host(host_id):
],
cwd=f"{get_root_dir()}/",
)


def get_process_code(process_id):
r = requests.post(
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/detail",
data={"type": "process", "id": process_id},
).json()
print(["code"])
13 changes: 12 additions & 1 deletion pygeoweaver/sc_export.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os.path
import zipfile
import subprocess
from pygeoweaver.utils import (
download_geoweaver_jar,
Expand All @@ -7,7 +9,9 @@
)


def export_workflow(workflow_id, mode, target_file_path):
def export_workflow(
workflow_id, mode=4, target_file_path=None, unzip=False, unzip_directory_name=None
):
"""
Usage: <main class> export workflow [--mode=<export_mode>] <workflow_id>
<target_file_path>
Expand Down Expand Up @@ -37,3 +41,10 @@ def export_workflow(workflow_id, mode, target_file_path):
],
cwd=f"{get_root_dir()}/",
)
if unzip:
if not unzip_directory_name:
raise Exception("Please provide unzip directory name")
with zipfile.ZipFile(target_file_path, "r") as zip_ref:
zip_ref.extractall(
os.path.join(os.path.dirname(target_file_path), unzip_directory_name)
)
54 changes: 54 additions & 0 deletions pygeoweaver/sc_find.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import requests
from . import constants
import pandas as pd


def get_process_by_name(process_name):
response = requests.post(
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/list", data={"type": "process"}
)
process_list = response.json()

matching_processes = []

for process in process_list:
if process["name"] == process_name:
matching_processes.append(process)
pd.set_option("display.max_columns", None) # Display all columns
pd.set_option("display.max_rows", None) # Display all rows
pd.set_option("display.expand_frame_repr", False) # Prevent truncation of columns
pd.DataFrame(matching_processes)


def get_process_by_id(process_id):
response = requests.post(
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/list", data={"type": "process"}
)
process_list = response.json()

matching_processes = []

for process in process_list:
if process["id"] == process_id:
matching_processes.append(process)
pd.set_option("display.max_columns", None) # Display all columns
pd.set_option("display.max_rows", None) # Display all rows
pd.set_option("display.expand_frame_repr", False) # Prevent truncation of columns
pd.DataFrame(matching_processes)


def get_process_by_language(language):
response = requests.post(
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/list", data={"type": "process"}
)
process_list = response.json()

matching_processes = []

for process in process_list:
if process["lang"] == language:
matching_processes.append(process)
pd.set_option("display.max_columns", None) # Display all columns
pd.set_option("display.max_rows", None) # Display all rows
pd.set_option("display.expand_frame_repr", False) # Prevent truncation of columns
pd.DataFrame(matching_processes)
4 changes: 3 additions & 1 deletion pygeoweaver/sc_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from pygeoweaver.utils import get_geoweaver_jar_path, get_java_bin_path, get_root_dir


def helpwith(command_list: list = [],):
def helpwith(
command_list: list = [],
):
target_cmd_args = [get_java_bin_path(), "-jar", get_geoweaver_jar_path()]
if len(command_list) > 0:
for i in range(len(command_list) - 1):
Expand Down
10 changes: 8 additions & 2 deletions pygeoweaver/sc_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ def get_process_history(process_id):
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/logs",
data={"type": "process", "id": process_id},
).json()
return pd.DataFrame(r)
df = pd.DataFrame(r)
df["history_begin_time"] = pd.to_datetime(df["history_begin_time"], unit="ms")
df["history_end_time"] = pd.to_datetime(df["history_end_time"], unit="ms")
return df
except Exception as e:
subprocess.run(
f"{get_java_bin_path()} -jar {get_geoweaver_jar_path()} process-history {process_id}",
Expand All @@ -59,7 +62,10 @@ def get_workflow_history(workflow_id):
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/logs",
data={"type": "workflow", "id": workflow_id},
).json()
return pd.DataFrame(r)
df = pd.DataFrame(r)
df["history_begin_time"] = pd.to_datetime(df["history_begin_time"], unit="ms")
df["history_end_time"] = pd.to_datetime(df["history_end_time"], unit="ms")
return df
except Exception as e:
subprocess.run(
f"{get_java_bin_path()} -jar {get_geoweaver_jar_path()} workflow-history {workflow_id}",
Expand Down
10 changes: 5 additions & 5 deletions pygeoweaver/sc_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

def import_workflow(workflow_zip_file_path):
"""
Usage: <main class> import workflow <workflow_zip_file_path>
import a workflow from file
<workflow_zip_file_path>
Geoweaver workflow zip file path
"""
Usage: <main class> import workflow <workflow_zip_file_path>
import a workflow from file
<workflow_zip_file_path>
Geoweaver workflow zip file path
"""
if not workflow_zip_file_path:
raise RuntimeError("Workflow zip file path is missing")
download_geoweaver_jar()
Expand Down
1 change: 1 addition & 0 deletions pygeoweaver/sc_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
from pygeoweaver.sc_help import *
from pygeoweaver.sc_create import *
from pygeoweaver.sc_sync import *
from pygeoweaver.sc_find import *
22 changes: 22 additions & 0 deletions pygeoweaver/sc_list.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import json

import requests
import subprocess
from . import constants
from pygeoweaver.utils import (
download_geoweaver_jar,
get_geoweaver_jar_path,
get_java_bin_path,
get_root_dir,
check_ipython,
)
import pandas as pd


def list_hosts():
Expand All @@ -24,6 +30,22 @@ def list_processes():
)


def list_processes_in_workflow(workflow_id):
download_geoweaver_jar()
payload = {"id": workflow_id, "type": "workflow"}
r = requests.post(
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/detail", data=payload
)
nodes = json.loads(r.json()["nodes"])
result = [
{"title": item["title"], "id": item["id"].split(".")[0]} for item in nodes
]

if check_ipython():
return pd.DataFrame(result)
return result


def list_workflows():
download_geoweaver_jar()
subprocess.run(
Expand Down
13 changes: 9 additions & 4 deletions pygeoweaver/sc_resetpassword.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@

def reset_password():
"""
Usage: <main class> resetpassword
Reset password for localhost
"""
Usage: <main class> resetpassword
Reset password for localhost
"""
download_geoweaver_jar()
subprocess.run(
[get_java_bin_path(), "-jar", get_geoweaver_jar_path(), "resetpassword",]
[
get_java_bin_path(),
"-jar",
get_geoweaver_jar_path(),
"resetpassword",
]
)
80 changes: 36 additions & 44 deletions pygeoweaver/sc_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

import requests

from . import constants
from pygeoweaver.utils import (
download_geoweaver_jar,
get_geoweaver_jar_path,
get_java_bin_path,
get_root_dir,
)
from . import constants


def run_process(
Expand All @@ -33,43 +33,25 @@ def run_process(
if password is None:
# prompt to ask for password
password = getpass.getpass(f"Enter password for host - {host_id}: ")

if sync_path:
ext, matching_dict = None, None
process_file = os.path.exists(os.path.join(sync_path, "code", "process.json"))
if not process_file:
print("process file does not exists, please check the path")
return
p_file = json.loads(
open(os.path.join(sync_path, "code", "process.json"), "r").read()
)
for item in p_file:
if item.get("id") == process_id:
matching_dict = item
break
if not matching_dict:
print("Could not find the file, please check the path")
return
if matching_dict["lang"] == "python":
ext = ".py"
if matching_dict["lang"] == "bash":
ext = ".bash"
if not ext:
print("Invalid file format.")
source_filename = matching_dict["name"] + ext
source_file_exists = os.path.exists(
os.path.join(sync_path, "code", source_filename)
if not os.path.exists(sync_path):
print("The specified path does nto exists")
print("Updating code on workflow with the given file path.\n")
f = open(sync_path, "r")
context = f.read()
f.close()
details = requests.post(
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/detail",
data={"type": "process", "id": process_id},
).json()
details["code"] = context
requests.post(
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/edit/process",
data=json.dumps(details),
headers={"Content-Type": "application/json"},
)
if source_file_exists:
f = open(os.path.join(sync_path, "code", source_filename), "r").read()
matching_dict["code"] = f
requests.post(
f"{constants.GEOWEAVER_DEFAULT_ENDPOINT_URL}/web/edit/process",
data=json.dumps(matching_dict),
headers={"Content-Type": "application/json"},
)
else:
print("File does not exists")

download_geoweaver_jar()
subprocess.run(
[
Expand Down Expand Up @@ -109,7 +91,7 @@ def run_workflow(
-f, --workflow-zip-file-path=<workflowZipPath>
workflow package or path to workflow zip to run
-h, --hosts=<hostStrings> hosts to run on. list of host ids with comma as separator.
-p, --passwords=<passes> passwords to the target hosts. list of passwords with comma as separator.
-p, --passwords=<passes> passwords to the target hosts. list of passwords with comma as separator.
"""
download_geoweaver_jar()

Expand All @@ -122,6 +104,7 @@ def run_workflow(
elif len(password_list.split(",")) != len(host_list.split(",")):
raise RuntimeError("The password list length doesn't match host list")

password_list = ",".join(password_list)
if sync_path:
from . import sync_workflow

Expand All @@ -141,10 +124,13 @@ def run_workflow(
"run",
"workflow",
workflow_id,
"-h",
host_list,
"-p",
password_list,
]
if environment_list:
command.extend(["-e", environment_list])
command.extend(["-h", host_list, "-p", ",".join(password_list)])
subprocess.run(command, cwd=f"{get_root_dir()}/")

if workflow_folder_path and not workflow_zip_file_path:
Expand All @@ -156,12 +142,15 @@ def run_workflow(
"run",
"workflow",
workflow_id,
"-d",
workflow_folder_path,
"-h",
host_list,
"-p",
password_list
]
if environment_list:
command.extend(["-e", environment_list])
command.extend(
["-d", workflow_folder_path, "-h", host_list, "-p", password_list]
)
subprocess.run(command, cwd=f"{get_root_dir()}/")

if not workflow_folder_path and workflow_zip_file_path:
Expand All @@ -172,10 +161,13 @@ def run_workflow(
"run",
"workflow",
workflow_id,
"-f",
workflow_zip_file_path,
"-h",
host_list,
"-p",
password_list,
]
if environment_list:
command.extend(["-e", environment_list])
command.extend(
["-f", workflow_zip_file_path, "-h", host_list, "-p", password_list]
)
subprocess.run(command, cwd=f"{get_root_dir()}/")
Loading

0 comments on commit 9d82691

Please sign in to comment.