Skip to content

Commit

Permalink
Update flink-yarn-application example to use multiple python files
Browse files Browse the repository at this point in the history
This closes #38.
  • Loading branch information
Sxnan authored Sep 21, 2023
1 parent 5603952 commit 40f274b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 14 deletions.
14 changes: 7 additions & 7 deletions flink-yarn-application/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ folder to run this example.
$ ./hadoop-3.3.6/bin/hdfs dfs -put venv.zip /venv.zip
$ rm -rf venv venv.zip
$ ./hadoop-3.3.6/bin/hdfs dfs -put data /data
$ ./hadoop-3.3.6/bin/hdfs dfs -put main.py /main.py
$ ./hadoop-3.3.6/bin/hdfs dfs -put code /code
```

3. Submit the Feathub job to Yarn cluster
Expand All @@ -89,7 +89,7 @@ folder to run this example.
-pyarch hdfs:///venv.zip \
-pyclientexec venv.zip/venv/bin/python3 \
-pyexec venv.zip/venv/bin/python3 \
-pyfs hdfs:///main.py \
-pyfs hdfs:///code \
-pym main
```

Expand All @@ -108,11 +108,11 @@ folder to run this example.
The file should contain the following rows:

```
user_1,item_1,1,"2022-01-01 00:00:00",100.0,100.0
user_1,item_2,2,"2022-01-01 00:01:00",200.0,500.0
user_1,item_1,3,"2022-01-01 00:02:00",200.0,1100.0
user_2,item_1,1,"2022-01-01 00:03:00",300.0,300.0
user_1,item_3,2,"2022-01-01 00:04:00",300.0,1200.0
user_1,item_1,1,"2022-01-01 00:00:00",user_1item_1,100.0,100.0
user_1,item_2,2,"2022-01-01 00:01:00",user_1item_2,200.0,500.0
user_1,item_1,3,"2022-01-01 00:02:00",user_1item_1,200.0,1100.0
user_2,item_1,1,"2022-01-01 00:03:00",user_2item_1,300.0,300.0
user_1,item_3,2,"2022-01-01 00:04:00",user_1item_3,300.0,1200.0
```

5. Tear down the Yarn cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from feathub.feature_tables.sources.file_system_source import FileSystemSource
from feathub.table.schema import Schema

from transforms import concat_user_id_item_id_transform

if __name__ == "__main__":
client = FeathubClient(
props={
Expand Down Expand Up @@ -99,10 +101,17 @@
),
)

f_user_id_item_id = Feature(
name="user_id_item_id",
transform=concat_user_id_item_id_transform,
dtype=types.String,
)

purchase_events_with_features = DerivedFeatureView(
name="purchase_events_with_features",
source=purchase_events_source,
features=[
f_user_id_item_id,
"item_price_events.price",
f_total_payment_last_two_minutes,
],
Expand Down
22 changes: 22 additions & 0 deletions flink-yarn-application/code/transforms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2022 The FeatHub Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from feathub.feature_views.transforms.python_udf_transform import PythonUdfTransform
from pandas import Series


def concat_user_id_item_id(row: Series) -> str:
return row["user_id"] + row["item_id"]


concat_user_id_item_id_transform = PythonUdfTransform(concat_user_id_item_id)
10 changes: 5 additions & 5 deletions flink-yarn-application/data/expected_output.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
user_1,item_1,1,"2022-01-01 00:00:00",100.0,100.0
user_1,item_2,2,"2022-01-01 00:01:00",200.0,500.0
user_1,item_1,3,"2022-01-01 00:02:00",200.0,1100.0
user_2,item_1,1,"2022-01-01 00:03:00",300.0,300.0
user_1,item_3,2,"2022-01-01 00:04:00",300.0,1200.0
user_1,item_1,1,"2022-01-01 00:00:00",user_1item_1,100.0,100.0
user_1,item_2,2,"2022-01-01 00:01:00",user_1item_2,200.0,500.0
user_1,item_1,3,"2022-01-01 00:02:00",user_1item_1,200.0,1100.0
user_2,item_1,1,"2022-01-01 00:03:00",user_2item_1,300.0,300.0
user_1,item_3,2,"2022-01-01 00:04:00",user_1item_3,300.0,1200.0
4 changes: 2 additions & 2 deletions flink-yarn-application/run_and_verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ zip -q -r venv.zip venv
./hadoop-3.3.6/bin/hdfs dfs -put venv.zip /venv.zip
rm -rf venv venv.zip
./hadoop-3.3.6/bin/hdfs dfs -put data /data
./hadoop-3.3.6/bin/hdfs dfs -put main.py /main.py
./hadoop-3.3.6/bin/hdfs dfs -put code /code

echo Submitting Feathub job to Yarn cluster
curl -LO https://archive.apache.org/dist/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz
Expand All @@ -61,7 +61,7 @@ HADOOP_CLASSPATH=$(hadoop-3.3.6/bin/hadoop classpath) \
-pyarch hdfs:///venv.zip \
-pyclientexec venv.zip/venv/bin/python3 \
-pyexec venv.zip/venv/bin/python3 \
-pyfs hdfs:///main.py \
-pyfs hdfs:///code \
-pym main

APPLICATION_ID=$(./hadoop-3.3.6/bin/yarn app -list | grep feathub_job | awk '{print $1}')
Expand Down

0 comments on commit 40f274b

Please sign in to comment.