-
Notifications
You must be signed in to change notification settings - Fork 19
/
local_parameter_example.py
115 lines (101 loc) · 3.75 KB
/
local_parameter_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# [start workflow_declare]
r"""
A tutorial example set local parameter in pydolphinscheduler.
Method 1:
task = Shell(..., input_params={"input":"a"}, output_params={"output": "b"})
Method 2:
task = Shell(...)
task.add_in("input", "a")
task.add_out("output", "b")
"""
from pydolphinscheduler.core.parameter import ParameterType
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.shell import Shell
with Workflow(name="local_parameter_example", release_state="offline") as workflow:
# [start parameter example]
# define a parameter "a", and use it in Shell task
example1_input_params = Shell(
name="example1_input_params",
command="echo ${a}",
input_params={
"a": "123",
},
)
# define a parameter "random_value", and pass it to the downstream tasks
example2_output_params = Shell(
name="example2_output_params",
command="""
val=$(echo $RANDOM)
echo "#{setValue(random_value=${val})}"
echo $val
""",
output_params={
"random_value": "",
},
)
# use the parameter "random_value", from upstream tasks
# we don't need to define input_params again if the parameter is from upstram tasks
example2_input_params = Shell(
name="example2_input_params", command="""echo ${random_value}"""
)
example2_output_params >> example2_input_params
# [end parameter example]
# [start parameter define]
# Add parameter via task arguments
task_1 = Shell(
name="task_1",
command="echo hello pydolphinscheduler",
input_params={
"value_VARCHAR": "abc",
"value_INTEGER": 123,
"value_FLOAT": 0.1,
"value_BOOLEAN": True,
},
output_params={
"value_EMPTY": None,
},
)
# Add parameter via task instance's method
task_2 = Shell(name="task_2", command="echo hello pydolphinscheduler")
task_2.add_in("value_VARCHAR", "abc")
task_2.add_in("value_INTEGER", 123)
task_2.add_in("value_FLOAT", 0.1)
task_2.add_in("value_BOOLEAN", True)
task_2.add_out("value_EMPTY")
# Task 1 is the same as task 2
# Others parameter types which cannot be converted automatically, must declare type explicitly
task_3 = Shell(
name="task_3",
command="echo '123' >> test.txt",
input_params={
"value_LONG": ParameterType.LONG("1000000"),
"value_DATE": ParameterType.DATE("2022-01-02"),
"value_TIME": ParameterType.TIME("2022-01-01"),
"value_TIMESTAMP": ParameterType.TIMESTAMP(123123124125),
"value_LIST": ParameterType.LIST("123123"),
},
output_params={
"output_INTEGER": ParameterType.INTEGER(100),
"output_LIST": ParameterType.LIST(),
"output_FILE": ParameterType.FILE("test.txt"),
},
)
workflow.submit()
# [end parameter define]
# [end workflow_declare]