Confluent Platform with LDAP AuthZ and AuthZ

The goal of the example is to demonstrate authentication and authorization with LDAP and RBAC on the external listener. Confluent will be connected to an AD with domain and uses this as identity provider. Which permissions a user has is defined in Confluent MDS based on role bindings to groups.

IMPORTANT: This example requires the Confluent Enterprise edition, because RBAC and LDAP authentication are not suppored by the Open Source edition.

Note: The Kubernetes manifests are based on the Confluent Helm Charts.


This example requires infrastructure/ldap and infrastructure/ec2-windows to be installed.

Connetc via RDP to the Windows EC2 instanced for and create the following users and groups:


  • mds

  • kafka

  • kafkarest

  • schemaregistry

  • controlcenter

  • app_geysers


  • team_enceladus

Add user app_geysers to group team_enceladus.


Create the Kubernetes namespace for this example:
kubectl apply -f namespace.yaml


Deploy CLIs:
kubectl apply -f cli


Deploy ZooKeeper:
kubectl apply -f cluster/zookeeper.yaml


Create the Kubernetes secret for MDS token keypair which is used for token signing:
./btpl security/mds-token.btpl.yaml | kubectl apply -f -
Create the Kubernetes secret for MDS LDAP authentication:
export MDS_USERNAME='[email protected]'
export MDS_PASSWORD='my_mds_password'
./btpl security/mds-credentials.btpl.yaml | kubectl apply -f -
Create the Kubernetes secret for Kafka Rest MDS authentication:
export KAFKAREST_USERNAME='[email protected]'
export KAFKAREST_PASSWORD='my_kafkarest_password'
./btpl security/kafkarest-credentials.btpl.yaml | kubectl apply -f -
Deploy Kafka brokers:
kubectl apply -f cluster/kafka.yaml

Schema Registry

Create the Kubernetes secret for SchemaRegistry MDS authentication:
export SCHEMAREGISTRY_USERNAME='schemaregistry'
export SCHEMAREGISTRY_PASSWORD='my_schemaregistry_password'
./btpl security/schemaregistry-credentials.btpl.yaml | kubectl apply -f -
Deploy Schema Registry:
kubectl apply -f cluster/schemaregistry.yaml

Control Center

Create the Kubernetes secret for Control Center MDS authentication:
export CONTROLCENTER_USERNAME='controlcenter'
export CONTROLCENTER_PASSWORD='my_conrolcenter_password'
./btpl security/controlcenter-credentials.btpl.yaml | kubectl apply -f -
Deploy Control Center:
kubectl apply -f cluster/controlcenter.yaml
Forward port of Control Center to localhost:
kubectl -n confluent-ldap port-forward service/controlcenter 9021:80

Navigate to http://localhost:9021


Finaly, if you are done with everything, undeploy it:
kubectl delete -f clie
kubectl delete -f cluster
kubectl -n confluent-ldap delete secret -l
kubectl -n confluent-ldap delete pvc -l
kubectl delete -f namespace.yaml

Verify Kafka AuthN & AuthZ

Verify AuthN with LDAP credentials

Exec into the Kafka Cli pod
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l -o name) bash
Create client config for kafka super user
export KAFKA_USERNAME=kafka
export KAFKA_PASSWORD='my_kafka_password'
cat > kafka.config << EOF
security.protocol=SASL_PLAINTEXT required \
    username="${KAFKA_USERNAME}" \
List topics with kafka super user
kafka-topics --command-config kafka.config --bootstrap-server kafka:9092 --list

This command will lis tall topics.

Create client config for app_geysers user
export APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
cat > app.config << EOF
security.protocol=SASL_PLAINTEXT required \
    username="${APP_USERNAME}" \
List topics with app_geysers user
kafka-topics --command-config app.config --bootstrap-server kafka:9092 --list

This is a valid user, but has no permissions. Therefore no topics are listed.

Create Kafka Role Bindings for Team Group

Exec into the Confluent Cli pod
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l -o name) bash
Login with super user kafka
confluent login
Resolve Cluster Id
apk add jq
export CLUSTER_ID="$(confluent cluster describe -o json | jq -r .crn)"
Create Role Bindings for group team_enceladus
confluent iam rbac role-binding create \
    --principal Group:team_enceladus \
    --role DeveloperManage \
    --resource Topic:enceladus_ \
    --prefix \
    --kafka-cluster-id $CLUSTER_ID

confluent iam rbac role-binding create \
    --principal Group:team_enceladus \
    --role DeveloperWrite \
    --resource Topic:enceladus_ \
    --prefix \
    --kafka-cluster-id $CLUSTER_ID

confluent iam rbac role-binding create \
    --principal Group:team_enceladus \
    --role DeveloperRead \
    --resource Topic:enceladus_ \
    --prefix \
    --kafka-cluster-id $CLUSTER_ID

confluent iam rbac role-binding create \
    --principal Group:team_enceladus \
    --role DeveloperManage \
    --resource Group:enceladus_ \
    --prefix \
    --kafka-cluster-id $CLUSTER_ID

confluent iam rbac role-binding create \
    --principal Group:team_enceladus \
    --role DeveloperRead \
    --resource Group:enceladus_ \
    --prefix \
    --kafka-cluster-id $CLUSTER_ID

confluent iam rbac role-binding create \
    --principal Group:team_enceladus \
    --role DeveloperWrite \
    --resource Group:enceladus_ \
    --prefix \
    --kafka-cluster-id $CLUSTER_ID
List created role bindings
confluent iam rbac role-binding list --kafka-cluster-id $CLUSTER_ID --principal Group:team_enceladus
Exec into the Kafka Cli pod
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l -o name) bash
Create client config for app_geysers user
export APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
cat > app.config << EOF
security.protocol=SASL_PLAINTEXT required \
    username="${APP_USERNAME}" \
List topics with app_geysers user
kafka-topics --command-config app.config --bootstrap-server kafka:9092 --list

This is a valid user, but has only permissions for topics prefixed with enceladus_.

Create an topic with name enceladus_app1
kafka-topics --command-config app.config --bootstrap-server kafka:9092 \
        --create --topic enceladus_app1 --replication-factor 3 --partitions 3
Try to create an topic with name europa_app1
kafka-topics --command-config app.config --bootstrap-server kafka:9092 \
        --create --topic europa_app1 --replication-factor 3 --partitions 3

The user app_geysers was only able to create the topic with the name enceladus_app1.

Publish a message to topic enceladus_app1
echo "test_message" | kafka-console-producer \
    --broker-list kafka:9092 \
    --topic enceladus_app1 \
    --producer.config app.config \
    --property parse.key=false
Consume a message from topic enceladus_app1 with consumer group enceladus_app1_cg
kafka-console-consumer \
    --bootstrap-server kafka:9092 \
    --topic enceladus_app1 \
    --group enceladus_app1_cg \
    --consumer.config app.config  \
    --from-beginning \
    --property parse.key=false \
    --max-messages 1

Verify Schema Registry AuthN & AuthZ

Verify AuthN with LDAP credentials

First exec into the Kafka Cli pod
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l -o name) bash
Create client config for app_geysers user
export APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
cat > app.config << EOF
security.protocol=SASL_PLAINTEXT required \
    username="${APP_USERNAME}" \
Create an topic with name enceladus_app2_avro
kafka-topics --command-config app.config --bootstrap-server kafka:9092 \
        --create --topic enceladus_app2_avro --replication-factor 3 --partitions 3
Exec into the Kafka Schema Registry Cli pod
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l -o name) bash
Send request to Schema Registry without authentication
curl http://schemaregistry/subjects

This request should fail with error Unauthorized.

Send request to Schema Registry with authentication with LDAP user
export APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
curl -u ${APP_USERNAME}:${APP_PASSWORD} http://schemaregistry/subjects

The user should be able to authenticate and an empty list should be returned.

Create client config for app_geysers user
export APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
cat > app.config << EOF
security.protocol=SASL_PLAINTEXT required \
    username="${APP_USERNAME}" \
Use the kafka-avro-console-producer to register a new schema and publish messages with app_geysers user
echo "A:{\"name\":\"Han Solo\",\"age\":30}" |
    kafka-avro-console-producer \
        --bootstrap-server kafka:9092 \
        --producer.config app.config \
        --topic enceladus_app2_avro \
        --sync \
        --request-required-acks -1 \
        --message-send-max-retries 3 \
        --property schema.registry.url=http://schemaregistry \
        --property basic.auth.credentials.source=USER_INFO \
        --property${APP_USERNAME}:${APP_PASSWORD} \
        --property parse.key=true \
        --property key.separator=: \
        --property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
        --property value.schema="{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"

This request should fail with error message: User is denied operation Write on Subject: enceladus_app2_avro-value

Before we can run this command, the user must have write permissions to the corresponding schema registry subject. Therefore, lets first create the required role bindings.

Create Schema Registry Role Bindings for Team Group

Exec into the Confluent Cli pod
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l -o name) bash
Login with super user kafka
confluent login
Resolve Cluster Id
apk add jq
export CLUSTER_ID="$(confluent cluster describe -o json | jq -r .crn)"
export SCHEMA_REGISTRY_CLUSTER_ID="schemaregistry" # the schema-registry-group-id of the cluster
Create Role Bindings for group team_enceladus
confluent iam rbac role-binding create \
    --principal Group:team_enceladus \
    --role DeveloperManage \
    --resource Subject:enceladus_ \
    --prefix \
    --kafka-cluster-id ${CLUSTER_ID}  \
    --schema-registry-cluster-id ${SCHEMA_REGISTRY_CLUSTER_ID}

confluent iam rbac role-binding create \
    --principal Group:team_enceladus \
    --role DeveloperWrite \
    --resource Subject:enceladus_ \
    --prefix \
    --kafka-cluster-id ${CLUSTER_ID}  \
    --schema-registry-cluster-id ${SCHEMA_REGISTRY_CLUSTER_ID}

confluent iam rbac role-binding create \
    --principal Group:team_enceladus \
    --role DeveloperRead \
    --resource Subject:enceladus_ \
    --prefix \
    --kafka-cluster-id ${CLUSTER_ID}  \
    --schema-registry-cluster-id ${SCHEMA_REGISTRY_CLUSTER_ID}
List created role bindings
confluent iam rbac role-binding list \
    --principal Group:team_enceladus \
    --kafka-cluster-id ${CLUSTER_ID} \
    --schema-registry-cluster-id ${SCHEMA_REGISTRY_CLUSTER_ID}

For more information about role bindings for schema registry see:

Exec into the Kafka Schema Registry Cli pod
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l -o name) bash
Create client config for app_geysers user
export APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
cat > app.config << EOF
security.protocol=SASL_PLAINTEXT required \
    username="${APP_USERNAME}" \
Use the kafka-avro-console-producer to register a new schema and publish messages with app_geysers user
echo "A:{\"name\":\"Han Solo\",\"age\":30}" |
    kafka-avro-console-producer \
        --bootstrap-server kafka:9092 \
        --producer.config app.config \
        --topic enceladus_app2_avro \
        --sync \
        --request-required-acks -1 \
        --message-send-max-retries 3 \
        --property schema.registry.url=http://schemaregistry \
        --property basic.auth.credentials.source=USER_INFO \
        --property${APP_USERNAME}:${APP_PASSWORD} \
        --property parse.key=true \
        --property key.separator=: \
        --property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
        --property value.schema="{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"

Now the user is authorized to register a schema for the corresponding topic.

Use the kafka-avro-console-consumer to consume the publish messages with app_geysers user
kafka-avro-console-consumer \
    --bootstrap-server kafka:9092 \
    --consumer.config app.config \
    --topic enceladus_app2_avro \
    --group enceladus_app1_cg \
    --from-beginning \
    --max-messages 1 \
    --property schema.registry.url=http://schemaregistry \
    --property basic.auth.credentials.source=USER_INFO \
    --property${APP_USERNAME}:${APP_PASSWORD} \
    --property parse.key=true \
    --property key.separator=: \
    --property key.serializer=org.apache.kafka.common.serialization.StringSerializer
You can also request the schema directly from the schema registry via curl.
export APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
curl -u ${APP_USERNAME}:${APP_PASSWORD} http://schemaregistry/subjects
curl -u ${APP_USERNAME}:${APP_PASSWORD} http://schemaregistry/subjects/enceladus_app2_avro-value/versions/1/schema

Verify Control Center AuthN & AuthZ

Verify AuthN with LDAP credentials

Navigate to http://localhost:9021

Now login with user app_geysers. This should work and you should be able to see all rescources like topics and schemas for which corresponding role bindinges exists for its group.

Next, try login with user kafka, which is a Kafka super user. This has been defined by starting Kafka with the following environment variable.

  value: User:admin;User:kafka;User:kafkarest;User:schemaregistry;User:controlcenter;User:ANONYMOUS

However, you will recognize that you have not even the permission to see the Kafka cluster itself. The reason for this is, that Control Center soley is based on RBAC for access controll. Therefore, in order to be able to manage all resources via Control Center, you must create the corresponding role bindings.

Create Role Bindings for kafka user

Exec into the Confluent Cli pod
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l -o name) bash
Login with super user kafka
confluent login
Resolve Cluster Id
apk add jq
export CLUSTER_ID="$(confluent cluster describe -o json | jq -r .crn)"
export SCHEMA_REGISTRY_CLUSTER_ID="schemaregistry" # the schema-registry-group-id of the cluster
Create Role Bindings for user kafka
confluent iam rbac role-binding create \
    --principal User:kafka \
    --role SystemAdmin \
    --kafka-cluster-id ${CLUSTER_ID}

confluent iam rbac role-binding create \
    --principal User:kafka \
    --role SystemAdmin \
    --kafka-cluster-id ${CLUSTER_ID}  \
    --schema-registry-cluster-id ${SCHEMA_REGISTRY_CLUSTER_ID}

Navigate again to http://localhost:9021 and login with user kafka. Now you are able to view an manage any resource which exists in the Confluent Platform cluster.