From 12f9cc03eb007af431fc6eb8d2ecd02c2976f370 Mon Sep 17 00:00:00 2001 From: Leo Singer Date: Fri, 5 May 2023 09:30:20 -0400 Subject: [PATCH] Remove oauth_cb workaround for KIP-768 support Fixes #49. --- pyproject.toml | 2 -- src/gcn_kafka/core.py | 12 ------------ src/gcn_kafka/oidc.py | 30 ------------------------------ test/test_oidc.py | 33 --------------------------------- 4 files changed, 77 deletions(-) delete mode 100644 src/gcn_kafka/oidc.py delete mode 100644 test/test_oidc.py diff --git a/pyproject.toml b/pyproject.toml index bd65f04..1414e67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,10 +20,8 @@ classifiers = [ "Topic :: System :: Networking" ] dependencies = [ - "authlib", "certifi", "confluent-kafka >= 2.2.0", - "requests", ] requires-python = ">=3.9" dynamic = [ "version" ] diff --git a/src/gcn_kafka/core.py b/src/gcn_kafka/core.py index c350088..e72e244 100644 --- a/src/gcn_kafka/core.py +++ b/src/gcn_kafka/core.py @@ -7,8 +7,6 @@ import confluent_kafka import confluent_kafka.admin -from .oidc import set_oauth_cb - def get_config(mode, config, **kwargs): # Merge configuration from user. @@ -46,7 +44,6 @@ def get_config(mode, config, **kwargs): if mode == "producer": config.setdefault("compression.type", "zstd") - set_oauth_cb(config) return config @@ -81,9 +78,6 @@ def __init__( **kwargs, ) ) - # Workaround for https://github.com/confluentinc/librdkafka/issues/3753#issuecomment-1058272987. - # FIXME: Remove once fixed upstream, or on removal of oauth_cb. - self.poll(0) class Consumer(confluent_kafka.Consumer): @@ -111,9 +105,6 @@ def __init__( **kwargs, ) ) - # Workaround for https://github.com/confluentinc/librdkafka/issues/3753#issuecomment-1058272987. - # FIXME: Remove once fixed upstream, or on removal of oauth_cb. - self.poll(0) class AdminClient(confluent_kafka.admin.AdminClient): @@ -141,6 +132,3 @@ def __init__( **kwargs, ) ) - # Workaround for https://github.com/confluentinc/librdkafka/issues/3753#issuecomment-1058272987. - # FIXME: Remove once fixed upstream, or on removal of oauth_cb. - self.poll(0) diff --git a/src/gcn_kafka/oidc.py b/src/gcn_kafka/oidc.py deleted file mode 100644 index 4a985ba..0000000 --- a/src/gcn_kafka/oidc.py +++ /dev/null @@ -1,30 +0,0 @@ -# SPDX-License-Identifier: CC0-1.0 - -from authlib.integrations.requests_client import OAuth2Session - - -def set_oauth_cb(config): - """Implement client support for KIP-768 OpenID Connect. - - Apache Kafka 3.1.0 supports authentication using OpenID Client Credentials. - Native support for Python is still incomplete due to this issue: - https://github.com/confluentinc/librdkafka/issues/3751 - - Meanwhile, this is a pure Python implementation of the refresh token - callback. - """ - if config.pop("sasl.oauthbearer.method", None) != "oidc": - return - - client_id = config.pop("sasl.oauthbearer.client.id") - client_secret = config.pop("sasl.oauthbearer.client.secret", None) - scope = config.pop("sasl.oauthbearer.scope", None) - token_endpoint = config.pop("sasl.oauthbearer.token.endpoint.url") - - session = OAuth2Session(client_id, client_secret, scope=scope) - - def oauth_cb(*_, **__): - token = session.fetch_token(token_endpoint, grant_type="client_credentials") - return token["access_token"], token["expires_at"] - - config["oauth_cb"] = oauth_cb diff --git a/test/test_oidc.py b/test/test_oidc.py deleted file mode 100644 index d33b3ef..0000000 --- a/test/test_oidc.py +++ /dev/null @@ -1,33 +0,0 @@ -from unittest.mock import MagicMock - -from gcn_kafka import oidc - - -def test_no_oidc(): - config = {} - oidc.set_oauth_cb(config) - assert config == {} - - -def test_oidc(monkeypatch): - mock_session_class = MagicMock() - monkeypatch.setattr(oidc, "OAuth2Session", mock_session_class) - - config = { - "sasl.oauthbearer.method": "oidc", - "sasl.oauthbearer.client.id": "client_id", - "sasl.oauthbearer.client.secret": "client_secret", - "sasl.oauthbearer.scope": "scope", - "sasl.oauthbearer.token.endpoint.url": "token_endpoint", - } - oidc.set_oauth_cb(config) - - oauth_cb = config.pop("oauth_cb") - assert config == {} - mock_session_class.assert_called_once_with( - "client_id", "client_secret", scope="scope" - ) - oauth_cb() - mock_session_class.return_value.fetch_token.assert_called_once_with( - "token_endpoint", grant_type="client_credentials" - )