-
Notifications
You must be signed in to change notification settings - Fork 3
/
eopfctl
executable file
·128 lines (105 loc) · 4.14 KB
/
eopfctl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python
import json
import logging
import sys
from traceback import print_exc
import click
import ray
from eopf.algorithms import ProcessingContext
from eopf.core.computing.pool import (DistributedPool, LocalPool,
PoolDescription)
from eopf.core.production.triggering import import_algorithms, registry
@click.group()
def cli():
pass
@cli.command(help="List registered processing units")
def list():
for algo in registry.algorithms:
print(algo)
schemahelp = """Prints processing unit json schema on STDOUT
\nArguments:
\n\t- ALGORITHM is the name of the processing unit
"""
@cli.command(help=schemahelp)
@click.argument("algorithm")
def schema(algorithm):
if algorithm in registry.algorithms:
algo = registry.algorithms[algorithm]
d = dict()
d["description"] = algo.__doc__
d["input"] = algo.input_class().json_schema()
d["output"] = algo.output_class().json_schema()
json.dump(d, sys.stdout)
else:
print("Error: {algorithm} does not exits", file=sys.stderr)
print("Use list command to find available algorithms", file=sys.stderr)
exit(1)
describehelp = """Print the description of an processing unit on STDOUT
\nArguments:
\n\t- ALGORITHM is the name of the processing unit to describe
"""
@cli.command(help=describehelp)
@click.argument("algorithm")
def describe(algorithm):
if algorithm in registry.algorithms:
algo = registry.algorithms[algorithm]
print(algo.__doc__)
else:
print("Error: {algorithm} does not exits", file=sys.stderr)
print("Use list command to find available processing units", file=sys.stderr)
exit(1)
runhelp = """Runs a processing unit
\nArguments:
\n\t- ALGORITHM is the name of the processing unit to run, to display available processing units one can use 'list' command
\n\t- INPUT_FILE is a json file containing processing unit input parameters
\n\t- OUTPUT_FILE is a json file where the result of the processing unit execution is written
\nIf an error occurs during the execution of the processing unit, exit status of the command is not zero and the error analysis is written in stderr
"""
@cli.command(help=runhelp)
@click.argument("algorithm")
@click.argument("input_file")
@click.argument("output_file")
@click.option(
"--resources",
default=None,
help="a URL to a json file describing the resource pool configuration.\n"
+ "By default the algorithm is executed locally and uses the maximum available resources.",
)
def run(algorithm, input_file, output_file, resources):
try:
if algorithm in registry.algorithms:
algo = registry.algorithms[algorithm]
with open(input_file) as fi:
input_class = algo.input_class()
param = input_class.from_json(fi.read(), validate=True)
if resources:
ray.init("auto", logging_level=logging.CRITICAL, configure_logging=True, include_dashboard=False)
with open(resources) as fr:
pool_desc = PoolDescription.from_json(fr.read(), validate=True)
context = ProcessingContext(DistributedPool(**pool_desc.to_dict()), None)
else:
context = ProcessingContext(LocalPool(), None)
output = algo(context)(param)
jsoutout = output.to_json()
with open(output_file, "w") as fo:
fo.write(jsoutout)
context.pool.close()
if ray.is_initialized():
ray.shutdown()
else:
print(f"Error: {algorithm} does not exits", file=sys.stderr)
print(
"Use list command to find available processing units", file=sys.stderr
)
exit(1)
except BaseException:
print(
f"An exception occurs running {algorithm} processing unit:", file=sys.stderr
)
print_exc(file=sys.stderr)
exit(2)
if __name__ == "__main__":
logging.basicConfig(filename="eopf_cli.log", level=logging.INFO)
import_algorithms()
cli()
ray.disconnect()