-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsettings.py
131 lines (120 loc) · 5.2 KB
/
settings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import os
from fastavro import schema
from credentials import get_mongodb_credentials
##################################################
# lightcurve_step Settings File
##################################################
def settings_creator():
# Set the global logging level to debug
logging_debug = bool(os.getenv("LOGGING_DEBUG"))
db_config = get_mongodb_credentials(os.environ["MONGODB_SECRET_NAME"])
# Consumer configuration
# Each consumer has different parameters and can be found in the documentation
consumer_config = {
"CLASS": "apf.consumers.KafkaConsumer",
"PARAMS": {
"bootstrap.servers": os.environ["CONSUMER_SERVER"],
"group.id": os.environ["CONSUMER_GROUP_ID"],
"auto.offset.reset": "beginning",
"enable.partition.eof": True
if os.getenv("ENABLE_PARTITION_EOF")
else False,
},
"TOPICS": os.environ["CONSUMER_TOPICS"].split(","),
"consume.messages": int(os.getenv("CONSUME_MESSAGES", 50)),
"consume.timeout": int(os.getenv("CONSUME_TIMEOUT", 0)),
}
producer_config = {
"CLASS": "apf.producers.KafkaProducer",
"PARAMS": {
"bootstrap.servers": os.environ["PRODUCER_SERVER"],
},
"TOPIC": os.environ["PRODUCER_TOPIC"],
"SCHEMA": schema.load_schema("schema.avsc"),
}
metrics_config = {
"CLASS": "apf.metrics.KafkaMetricsProducer",
"EXTRA_METRICS": [
{"key": "aid", "format": lambda x: str(x)},
],
"PARAMS": {
"PARAMS": {
"bootstrap.servers": os.environ["METRICS_SERVER"],
"auto.offset.reset": "smallest",
},
"TOPIC": os.getenv("METRICS_TOPIC", "metrics"),
"SCHEMA": {
"$schema": "http://json-schema.org/draft-07/schema",
"$id": "http://example.com/example.json",
"type": "object",
"title": "The root schema",
"description": "The root schema comprises the entire JSON document.",
"default": {},
"examples": [
{"timestamp_sent": "2020-09-01", "timestamp_received": "2020-09-01"}
],
"required": ["timestamp_sent", "timestamp_received"],
"properties": {
"timestamp_sent": {
"$id": "#/properties/timestamp_sent",
"type": "string",
"title": "The timestamp_sent schema",
"description": "Timestamp sent refers to the time at which a message is sent.",
"default": "",
"examples": ["2020-09-01"],
},
"timestamp_received": {
"$id": "#/properties/timestamp_received",
"type": "string",
"title": "The timestamp_received schema",
"description": "Timestamp received refers to the time at which a message is received.",
"default": "",
"examples": ["2020-09-01"],
},
},
"additionalProperties": True,
},
},
}
if os.getenv("CONSUMER_KAFKA_USERNAME") and os.getenv("CONSUMER_KAFKA_PASSWORD"):
consumer_config["PARAMS"]["security.protocol"] = "SASL_SSL"
consumer_config["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
consumer_config["PARAMS"]["sasl.username"] = os.getenv(
"CONSUMER_KAFKA_USERNAME"
)
consumer_config["PARAMS"]["sasl.password"] = os.getenv(
"CONSUMER_KAFKA_PASSWORD"
)
if os.getenv("PRODUCER_KAFKA_USERNAME") and os.getenv("PRODUCER_KAFKA_PASSWORD"):
producer_config["PARAMS"]["security.protocol"] = "SASL_SSL"
producer_config["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
producer_config["PARAMS"]["sasl.username"] = os.getenv(
"PRODUCER_KAFKA_USERNAME"
)
producer_config["PARAMS"]["sasl.password"] = os.getenv(
"PRODUCER_KAFKA_PASSWORD"
)
if os.getenv("METRICS_KAFKA_USERNAME") and os.getenv("METRICS_KAFKA_PASSWORD"):
metrics_config["PARAMS"]["PARAMS"]["security.protocol"] = "SASL_SSL"
metrics_config["PARAMS"]["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
metrics_config["PARAMS"]["PARAMS"]["sasl.username"] = os.getenv(
"METRICS_KAFKA_USERNAME"
)
metrics_config["PARAMS"]["PARAMS"]["sasl.password"] = os.getenv(
"METRICS_KAFKA_PASSWORD"
)
prometheus = os.getenv("USE_PROMETHEUS", False)
use_profiling = bool(os.getenv("USE_PROFILING", True))
pyroscope_server = os.getenv("PYROSCOPE_SERVER", "http://pyroscope.pyroscope:4040")
# Step Configuration
step_config = {
"CONSUMER_CONFIG": consumer_config,
"PRODUCER_CONFIG": producer_config,
"METRICS_CONFIG": metrics_config,
"PROMETHEUS": prometheus,
"DB_CONFIG": db_config,
"LOGGING_DEBUG": logging_debug,
"USE_PROFILING": use_profiling,
"PYROSCOPE_SERVER": pyroscope_server
}
return step_config