-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
65 lines (50 loc) · 2.57 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from event_detection import EventDetection
from flink_io.measurements_source import create_measurements_env
from flink_io.sink_jdbc import create_event_sink_table, create_values_sink_table
from flink_io.tag_config_source import create_config_env
from flink_types.joined_inputs import JoinedMeasurements
from flink_types.processed_measurements import ProcessedMeasurements
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
jars = ["flink-sql-connector-kinesis_2.12-1.13.2.jar", "flink-connector-jdbc_2.12-1.13.2.jar", "postgresql-42.2.12.jar"]
jar_string_config = "".join([f"""file:///{CURRENT_DIR}/lib/{jar};""" for jar in jars])[:-1]
def main():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.get_config().get_configuration().set_string("pipeline.jars", jar_string_config)
# Init table sources
t_env = create_measurements_env(t_env)
t_env = create_config_env(t_env)
# Create tables
config = t_env.from_path("config")
measurements = t_env.from_path("measurements")
# Join source tables
enriched_measurements = (
measurements.where(measurements.q)
.drop_columns(measurements.q)
.join(config)
.where(measurements.id == config.tag_uuid)
)
# From table API to stream and process Events
ds = (
t_env.to_append_stream(enriched_measurements, type_info=JoinedMeasurements().flink_types())
.key_by(lambda x: x.group_id)
.flat_map(EventDetection(), ProcessedMeasurements().flink_types())
)
# Temp create view to use it in table api sql DML queries
t_env.create_temporary_view("processed_measurements", t_env.from_data_stream(ds))
# Init sinks
events_sink = create_event_sink_table(t_env)
values_sink = create_values_sink_table(t_env)
# Build insert functions and execute
insert_events_sql = f"""INSERT INTO {events_sink} SELECT state_config_id, result_value, ts_start, ts_end FROM processed_measurements WHERE category = 'event_type' """
insert_values_sql = f"""INSERT INTO {values_sink} SELECT value_config_id, events_id, result_value, ts_start, value_count FROM processed_measurements WHERE category = 'value_type' """
stmt_set = t_env.create_statement_set()
stmt_set.add_insert_sql(insert_events_sql)
stmt_set.add_insert_sql(insert_values_sql)
table_result = stmt_set.execute()
table_result.wait()
if __name__ == "__main__":
main()