Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(apf): add offset to consumer config to modify starting or ending timestamps #291

Merged
merged 7 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lc_classification_step/scripts/run_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
PACKAGE_PATH = os.path.abspath(os.path.join(SCRIPT_PATH, ".."))

sys.path.append(PACKAGE_PATH)
from settings import STEP_CONFIG
from settings import settings_creator

STEP_CONFIG = settings_creator()

level = logging.INFO
if os.getenv("LOGGING_DEBUG"):
Expand Down
263 changes: 136 additions & 127 deletions lc_classification_step/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,146 +7,155 @@
from fastavro.schema import load_schema
from fastavro.repository.base import SchemaRepositoryError

CONSUMER_CONFIG = {
"CLASS": os.getenv("CONSUMER_CLASS", "apf.consumers.KafkaConsumer"),
"TOPICS": os.environ["CONSUMER_TOPICS"].strip().split(","),
"PARAMS": {
"bootstrap.servers": os.environ["CONSUMER_SERVER"],
"group.id": os.environ["CONSUMER_GROUP_ID"],
"auto.offset.reset": "beginning",
"enable.partition.eof": bool(os.getenv("ENABLE_PARTITION_EOF", None)),
},
"consume.timeout": int(os.getenv("CONSUME_TIMEOUT", 0)),
"consume.messages": int(os.getenv("CONSUME_MESSAGES", 1000)),
}

try:
ELASTICC_SCHEMA = load_schema("schemas/output_elasticc.avsc")
except SchemaRepositoryError:
ELASTICC_SCHEMA = load_schema(
"lc_classification_step/schemas/output_elasticc.avsc"
)
PRODUCER_CONFIG = {
"TOPIC_STRATEGY": {
def model_config_factory():
modelclass = os.getenv("MODEL_CLASS")
config = configurator(modelclass)
return config


def settings_creator():
CONSUMER_CONFIG = {
"CLASS": os.getenv("CONSUMER_CLASS", "apf.consumers.KafkaConsumer"),
"TOPICS": os.environ["CONSUMER_TOPICS"].strip().split(","),
"PARAMS": {
"topic_format": os.environ["PRODUCER_TOPIC_FORMAT"],
"date_format": os.environ["PRODUCER_DATE_FORMAT"],
"change_hour": int(os.environ["PRODUCER_CHANGE_HOUR"]),
"retention_days": int(os.environ["PRODUCER_RETENTION_DAYS"]),
"bootstrap.servers": os.environ["CONSUMER_SERVER"],
"group.id": os.environ["CONSUMER_GROUP_ID"],
"auto.offset.reset": "beginning",
"enable.partition.eof": bool(
os.getenv("ENABLE_PARTITION_EOF", None)
),
},
"consume.timeout": int(os.getenv("CONSUME_TIMEOUT", 0)),
"consume.messages": int(os.getenv("CONSUME_MESSAGES", 1000)),
}

try:
ELASTICC_SCHEMA = load_schema("schemas/output_elasticc.avsc")
except SchemaRepositoryError:
ELASTICC_SCHEMA = load_schema(
"lc_classification_step/schemas/output_elasticc.avsc"
)
PRODUCER_CONFIG = {
"TOPIC_STRATEGY": {
"PARAMS": {
"topic_format": os.environ["PRODUCER_TOPIC_FORMAT"],
"date_format": os.environ["PRODUCER_DATE_FORMAT"],
"change_hour": int(os.environ["PRODUCER_CHANGE_HOUR"]),
"retention_days": int(os.environ["PRODUCER_RETENTION_DAYS"]),
},
"CLASS": os.getenv(
"PRODUCER_TOPIC_STRATEGY_CLASS",
"apf.core.topic_management.DailyTopicStrategy",
),
},
"PARAMS": {
"bootstrap.servers": os.environ["PRODUCER_SERVER"],
},
"CLASS": os.getenv(
"PRODUCER_TOPIC_STRATEGY_CLASS",
"apf.core.topic_management.DailyTopicStrategy",
"PRODUCER_CLASS", "apf.producers.kafka.KafkaProducer"
),
},
"PARAMS": {
"bootstrap.servers": os.environ["PRODUCER_SERVER"],
},
"CLASS": os.getenv("PRODUCER_CLASS", "apf.producers.kafka.KafkaProducer"),
"SCHEMA": SCHEMA
if os.getenv("STREAM", "ztf") == "ztf"
else ELASTICC_SCHEMA,
}

SCRIBE_PRODUCER_CONFIG = {
"CLASS": "apf.producers.KafkaProducer",
"PARAMS": {
"bootstrap.servers": os.environ["SCRIBE_SERVER"],
},
"TOPIC": os.environ["SCRIBE_TOPIC"],
"SCHEMA": SCRIBE_SCHEMA,
}
"SCHEMA": SCHEMA
if os.getenv("STREAM", "ztf") == "ztf"
else ELASTICC_SCHEMA,
}

METRICS_CONFIG = {
"CLASS": "apf.metrics.KafkaMetricsProducer",
"EXTRA_METRICS": [{"key": "aid", "alias": "aid"}, {"key": "candid"}],
"PARAMS": {
SCRIBE_PRODUCER_CONFIG = {
"CLASS": "apf.producers.KafkaProducer",
"PARAMS": {
"bootstrap.servers": os.environ["METRICS_HOST"],
"bootstrap.servers": os.environ["SCRIBE_SERVER"],
},
"TOPIC": os.environ["METRICS_TOPIC"],
"SCHEMA": {
"type": "object",
"required": ["timestamp_sent", "timestamp_received"],
"properties": {
"timestamp_sent": {
"type": "string",
"description": "Timestamp sent refers to the time at which a message is sent.",
"default": "",
"examples": ["2020-09-01"],
},
"timestamp_received": {
"type": "string",
"description": "Timestamp received refers to the time at which a message is received.",
"default": "",
"examples": ["2020-09-01"],
"TOPIC": os.environ["SCRIBE_TOPIC"],
"SCHEMA": SCRIBE_SCHEMA,
}

METRICS_CONFIG = {
"CLASS": "apf.metrics.KafkaMetricsProducer",
"EXTRA_METRICS": [{"key": "aid", "alias": "aid"}, {"key": "candid"}],
"PARAMS": {
"PARAMS": {
"bootstrap.servers": os.environ["METRICS_HOST"],
},
"TOPIC": os.environ["METRICS_TOPIC"],
"SCHEMA": {
"type": "object",
"required": ["timestamp_sent", "timestamp_received"],
"properties": {
"timestamp_sent": {
"type": "string",
"description": "Timestamp sent refers to the time at which a message is sent.",
"default": "",
"examples": ["2020-09-01"],
},
"timestamp_received": {
"type": "string",
"description": "Timestamp received refers to the time at which a message is received.",
"default": "",
"examples": ["2020-09-01"],
},
},
"additionalProperties": True,
},
"additionalProperties": True,
},
},
}
}

if os.getenv("CONSUMER_KAFKA_USERNAME") and os.getenv(
"CONSUMER_KAFKA_PASSWORD"
):
CONSUMER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL"
CONSUMER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
CONSUMER_CONFIG["PARAMS"]["sasl.username"] = os.getenv(
"CONSUMER_KAFKA_USERNAME"
)
CONSUMER_CONFIG["PARAMS"]["sasl.password"] = os.getenv(
if os.getenv("CONSUMER_KAFKA_USERNAME") and os.getenv(
"CONSUMER_KAFKA_PASSWORD"
)
if os.getenv("PRODUCER_KAFKA_USERNAME") and os.getenv(
"PRODUCER_KAFKA_PASSWORD"
):
PRODUCER_CONFIG["PARAMS"]["security.protocol"] = os.getenv(
"PRODUCER_SECURITY_PROTOCOL", "SASL_PLAINTEXT"
)
PRODUCER_CONFIG["PARAMS"]["sasl.mechanism"] = os.getenv(
"PRODUCER_SASL_MECHANISM", "SCRAM-SHA-256"
)
PRODUCER_CONFIG["PARAMS"]["sasl.username"] = os.getenv(
"PRODUCER_KAFKA_USERNAME"
)
PRODUCER_CONFIG["PARAMS"]["sasl.password"] = os.getenv(
):
CONSUMER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL"
CONSUMER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
CONSUMER_CONFIG["PARAMS"]["sasl.username"] = os.getenv(
"CONSUMER_KAFKA_USERNAME"
)
CONSUMER_CONFIG["PARAMS"]["sasl.password"] = os.getenv(
"CONSUMER_KAFKA_PASSWORD"
)
if os.getenv("PRODUCER_KAFKA_USERNAME") and os.getenv(
"PRODUCER_KAFKA_PASSWORD"
)
if os.getenv("SCRIBE_KAFKA_USERNAME") and os.getenv("SCRIBE_KAFKA_PASSWORD"):
SCRIBE_PRODUCER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL"
SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.username"] = os.getenv(
"SCRIBE_KAFKA_USERNAME"
)
SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.password"] = os.getenv(
):
PRODUCER_CONFIG["PARAMS"]["security.protocol"] = os.getenv(
"PRODUCER_SECURITY_PROTOCOL", "SASL_PLAINTEXT"
)
PRODUCER_CONFIG["PARAMS"]["sasl.mechanism"] = os.getenv(
"PRODUCER_SASL_MECHANISM", "SCRAM-SHA-256"
)
PRODUCER_CONFIG["PARAMS"]["sasl.username"] = os.getenv(
"PRODUCER_KAFKA_USERNAME"
)
PRODUCER_CONFIG["PARAMS"]["sasl.password"] = os.getenv(
"PRODUCER_KAFKA_PASSWORD"
)
if os.getenv("SCRIBE_KAFKA_USERNAME") and os.getenv(
"SCRIBE_KAFKA_PASSWORD"
)
if os.getenv("METRICS_KAFKA_USERNAME") and os.getenv("METRICS_KAFKA_PASSWORD"):
METRICS_CONFIG["PARAMS"]["PARAMS"]["security.protocol"] = "SASL_SSL"
METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.username"] = os.getenv(
"METRICS_KAFKA_USERNAME"
)
METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.password"] = os.getenv(
):
SCRIBE_PRODUCER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL"
SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.username"] = os.getenv(
"SCRIBE_KAFKA_USERNAME"
)
SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.password"] = os.getenv(
"SCRIBE_KAFKA_PASSWORD"
)
if os.getenv("METRICS_KAFKA_USERNAME") and os.getenv(
"METRICS_KAFKA_PASSWORD"
)


def model_config_factory():
modelclass = os.getenv("MODEL_CLASS")
config = configurator(modelclass)
return config


STEP_CONFIG = {
"PROMETHEUS": bool(os.getenv("USE_PROMETHEUS", True)),
"SCRIBE_PRODUCER_CONFIG": SCRIBE_PRODUCER_CONFIG,
"CONSUMER_CONFIG": CONSUMER_CONFIG,
"PRODUCER_CONFIG": PRODUCER_CONFIG,
"METRICS_CONFIG": METRICS_CONFIG,
"MODEL_VERSION": os.getenv("MODEL_VERSION", "dev"),
"MODEL_CONFIG": model_config_factory(),
"SCRIBE_PARSER_CLASS": os.getenv("SCRIBE_PARSER_CLASS"),
"STEP_PARSER_CLASS": os.getenv("STEP_PARSER_CLASS"),
}
):
METRICS_CONFIG["PARAMS"]["PARAMS"]["security.protocol"] = "SASL_SSL"
METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512"
METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.username"] = os.getenv(
"METRICS_KAFKA_USERNAME"
)
METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.password"] = os.getenv(
"METRICS_KAFKA_PASSWORD"
)
STEP_CONFIG = {
"PROMETHEUS": bool(os.getenv("USE_PROMETHEUS", True)),
"SCRIBE_PRODUCER_CONFIG": SCRIBE_PRODUCER_CONFIG,
"CONSUMER_CONFIG": CONSUMER_CONFIG,
"PRODUCER_CONFIG": PRODUCER_CONFIG,
"METRICS_CONFIG": METRICS_CONFIG,
"MODEL_VERSION": os.getenv("MODEL_VERSION", "dev"),
"MODEL_CONFIG": model_config_factory(),
"SCRIBE_PARSER_CLASS": os.getenv("SCRIBE_PARSER_CLASS"),
"STEP_PARSER_CLASS": os.getenv("STEP_PARSER_CLASS"),
}
return STEP_CONFIG
16 changes: 11 additions & 5 deletions lc_classification_step/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def kafka_service(docker_ip, docker_services):

@pytest.fixture
def env_variables_ztf():
envcopy = os.environ.copy()

def set_env_variables():
random_string = uuid.uuid4().hex
env_variables_dict = {
Expand All @@ -112,11 +114,14 @@ def set_env_variables():
for key, value in env_variables_dict.items():
os.environ[key] = value

return set_env_variables
yield set_env_variables
os.environ = envcopy


@pytest.fixture
def env_variables_elasticc():
envcopy = os.environ.copy()

def set_env_variables(
model: str,
model_class: str,
Expand Down Expand Up @@ -147,7 +152,8 @@ def set_env_variables(
for key, value in env_variables_dict.items():
os.environ[key] = value

return set_env_variables
yield set_env_variables
os.environ = envcopy


@pytest.fixture
Expand Down Expand Up @@ -185,7 +191,7 @@ def _produce_messages(topic, SCHEMA):
producer.produce(message)


@pytest.fixture(scope="session")
@pytest.fixture
def kafka_consumer():
def factory(
stream: str,
Expand All @@ -212,14 +218,14 @@ def factory(
return factory


@pytest.fixture(scope="session")
@pytest.fixture
def scribe_consumer():
def factory():
consumer = KafkaConsumer(
{
"PARAMS": {
"bootstrap.servers": "localhost:9092",
"group.id": "test_step_",
"group.id": f"test_step_{time.time()}",
"auto.offset.reset": "beginning",
"enable.partition.eof": True,
},
Expand Down
Loading
Loading