-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rest-proxy: split out into standalone module #993
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
[run] | ||
branch = true | ||
relative_files = true | ||
source = src/karapace | ||
source = src/karapace,src/rest_proxy |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
#!/usr/bin/env bash | ||
|
||
retries=5 | ||
|
||
for ((i = 0; i <= retries; i++)); do | ||
response=$(curl --silent --verbose --fail http://localhost:8083/topics) | ||
|
||
if [[ $response == '["_schemas","__consumer_offsets"]' ]]; then | ||
echo "Ok!" | ||
break | ||
nosahama marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fi | ||
|
||
if ((i == retries)); then | ||
echo "Still failing after $i retries, giving up." | ||
exit 1 | ||
fi | ||
|
||
echo "Smoke test failed, retrying in 5 seconds ..." | ||
sleep 5 | ||
done |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,22 @@ rest) | |
echo "Starting Karapace REST API" | ||
exec python3 -m karapace.karapace_all /opt/karapace/rest.config.json | ||
;; | ||
rest_proxy) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The case |
||
# Reexport variables for compatibility | ||
[[ -n ${KARAPACE_REST_ADVERTISED_HOSTNAME+isset} ]] && export KARAPACE_ADVERTISED_HOSTNAME="${KARAPACE_REST_ADVERTISED_HOSTNAME}" | ||
[[ -n ${KARAPACE_REST_BOOTSTRAP_URI+isset} ]] && export KARAPACE_BOOTSTRAP_URI="${KARAPACE_REST_BOOTSTRAP_URI}" | ||
[[ -n ${KARAPACE_REST_REGISTRY_HOST+isset} ]] && export KARAPACE_REGISTRY_HOST="${KARAPACE_REST_REGISTRY_HOST}" | ||
[[ -n ${KARAPACE_REST_REGISTRY_PORT+isset} ]] && export KARAPACE_REGISTRY_PORT="${KARAPACE_REST_REGISTRY_PORT}" | ||
[[ -n ${KARAPACE_REST_HOST+isset} ]] && export KARAPACE_HOST="${KARAPACE_REST_HOST}" | ||
[[ -n ${KARAPACE_REST_PORT+isset} ]] && export KARAPACE_PORT="${KARAPACE_REST_PORT}" | ||
[[ -n ${KARAPACE_REST_ADMIN_METADATA_MAX_AGE+isset} ]] && export KARAPACE_ADMIN_METADATA_MAX_AGE="${KARAPACE_REST_ADMIN_METADATA_MAX_AGE}" | ||
[[ -n ${KARAPACE_REST_LOG_LEVEL+isset} ]] && export KARAPACE_LOG_LEVEL="${KARAPACE_REST_LOG_LEVEL}" | ||
export KARAPACE_REST=1 | ||
echo "{}" >/opt/karapace/rest.config.json | ||
|
||
echo "Starting Karapace REST API" | ||
exec python3 -m rest_proxy /opt/karapace/rest.config.json | ||
;; | ||
registry) | ||
# Reexport variables for compatibility | ||
[[ -n ${KARAPACE_REGISTRY_ADVERTISED_HOSTNAME+isset} ]] && export KARAPACE_ADVERTISED_HOSTNAME="${KARAPACE_REGISTRY_ADVERTISED_HOSTNAME}" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
""" | ||
Copyright (c) 2023 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
from __future__ import annotations | ||
|
||
from aiohttp.web_log import AccessLogger | ||
from contextlib import closing | ||
from karapace import version as karapace_version | ||
from karapace.config import Config, read_config | ||
from karapace.instrumentation.prometheus import PrometheusInstrumentation | ||
from karapace.utils import DebugAccessLogger | ||
from rest_proxy import KafkaRest | ||
from typing import Final | ||
|
||
import argparse | ||
import logging | ||
import sys | ||
|
||
PROGRAM_NAME: Final[str] = "karapace_rest_proxy" | ||
|
||
|
||
def _configure_logging(*, config: Config) -> None: | ||
log_level = config.get("log_level", "DEBUG") | ||
log_format = config.get("log_format", "%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s") | ||
|
||
root_handler: logging.Handler | None = None | ||
log_handler = config.get("log_handler", None) | ||
if "systemd" == log_handler: | ||
from systemd import journal | ||
|
||
root_handler = journal.JournalHandler(SYSLOG_IDENTIFIER="karapace") | ||
elif "stdout" == log_handler or log_handler is None: | ||
root_handler = logging.StreamHandler(stream=sys.stdout) | ||
else: | ||
logging.basicConfig(level=logging.INFO, format=log_format) | ||
logging.getLogger().setLevel(log_level) | ||
logging.warning("Log handler %s not recognized, root handler not set.", log_handler) | ||
|
||
if root_handler is not None: | ||
root_handler.setFormatter(logging.Formatter(log_format)) | ||
root_handler.setLevel(log_level) | ||
root_handler.set_name(name="karapace") | ||
logging.root.addHandler(root_handler) | ||
|
||
logging.root.setLevel(log_level) | ||
|
||
if config.get("access_logs_debug") is True: | ||
config["access_log_class"] = DebugAccessLogger | ||
logging.getLogger("aiohttp.access").setLevel(logging.DEBUG) | ||
else: | ||
config["access_log_class"] = AccessLogger | ||
|
||
|
||
def main() -> int: | ||
parser = argparse.ArgumentParser( | ||
prog=PROGRAM_NAME, | ||
description="Karapace rest proxy: exposes an API over common Kafka operations, your Kafka essentials in one tool", | ||
) | ||
parser.add_argument("--version", action="version", help="show program version", version=karapace_version.__version__) | ||
parser.add_argument("config_file", help="configuration file path", type=argparse.FileType()) | ||
arg = parser.parse_args() | ||
|
||
with closing(arg.config_file): | ||
config = read_config(arg.config_file) | ||
Comment on lines
+64
to
+65
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you aren't using the closing arg I don't think you are closing it correctly. with closing(foo) as bar:
config = read_config(bar) ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can take a look, once again, this is existing code 😉 |
||
|
||
logging.log(logging.INFO, "\n%s\\Co %s\n%s", ("=" * 50), PROGRAM_NAME, ("=" * 50)) | ||
|
||
_configure_logging(config=config) | ||
|
||
app = KafkaRest(config=config) | ||
|
||
logging.log(logging.INFO, "\n%s\nStarting %s\n%s", ("=" * 50), PROGRAM_NAME, ("=" * 50)) | ||
|
||
config_without_secrets = {} | ||
for key, value in config.items(): | ||
if "password" in key: | ||
value = "****" | ||
Comment on lines
+77
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is dangerous, you should keep the info about the secrecy of the field in the context of where its created. Otherwise if tomorrow we add another field like e.g.
In that way the filtering logic its "near" to where the addition/removal/renaming of the property is, its a property of the data structure rather than a piece of coupled code put randomly in another file and that the developer should know to look before adding/removing something far away. PS I think the logic wasn't covering the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once again, this is existing code 😉 |
||
config_without_secrets[key] = value | ||
logging.log(logging.DEBUG, "Config %r", config_without_secrets) | ||
|
||
try: | ||
PrometheusInstrumentation.setup_metrics(app=app) | ||
app.run() # `close` will be called by the callback `close_by_app` set by `KarapaceBase` | ||
except Exception as ex: # pylint: disable-broad-except | ||
app.stats.unexpected_exception(ex=ex, where=f"{PROGRAM_NAME}_main") | ||
raise | ||
|
||
return 0 | ||
|
||
|
||
if __name__ == "__main__": | ||
sys.exit(main()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would
__consumer_offsets
be enough here, the_schemas
is schema registry special.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think testing against
_schemas
adds a little bit more coverage as that topic is created from application code and truly verifies the flow.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this is existing code/logic.