The goal of the example is to demonstrate authentication and authorization with LDAP and RBAC on the external listener. Confluent will be connected to an AD with domain ada.letuscode.xyz
and uses this as identity provider. Which permissions a user has is defined in Confluent MDS based on role bindings to groups.
IMPORTANT: This example requires the Confluent Enterprise edition, because RBAC and LDAP authentication are not suppored by the Open Source edition.
Note: The Kubernetes manifests are based on the Confluent Helm Charts.
This example requires infrastructure/ldap and infrastructure/ec2-windows to be installed.
Connetc via RDP to the Windows EC2 instanced for ada.letuscode.xyz
and create the following users and groups:
ou=Users,ou=ada,dc=ada,dc=letuscode,dc=xyz
:
-
mds
-
kafka
-
kafkarest
-
schemaregistry
-
controlcenter
-
app_geysers
ou=Groups,ou=ada,dc=ada,dc=letuscode,dc=xyz
:
-
team_enceladus
Add user app_geysers
to group team_enceladus
.
kubectl apply -f namespace.yaml
./create-keys.sh
./btpl security/mds-token.btpl.yaml | kubectl apply -f -
export MDS_USERNAME='[email protected]'
export MDS_PASSWORD='my_mds_password'
./btpl security/mds-credentials.btpl.yaml | kubectl apply -f -
export KAFKAREST_USERNAME='[email protected]'
export KAFKAREST_PASSWORD='my_kafkarest_password'
./btpl security/kafkarest-credentials.btpl.yaml | kubectl apply -f -
kubectl apply -f cluster/kafka.yaml
export SCHEMAREGISTRY_USERNAME='schemaregistry'
export SCHEMAREGISTRY_PASSWORD='my_schemaregistry_password'
./btpl security/schemaregistry-credentials.btpl.yaml | kubectl apply -f -
kubectl apply -f cluster/schemaregistry.yaml
export CONTROLCENTER_USERNAME='controlcenter'
export CONTROLCENTER_PASSWORD='my_conrolcenter_password'
./btpl security/controlcenter-credentials.btpl.yaml | kubectl apply -f -
kubectl apply -f cluster/controlcenter.yaml
kubectl -n confluent-ldap port-forward service/controlcenter 9021:80
Navigate to http://localhost:9021
kubectl delete -f clie
kubectl delete -f cluster
kubectl -n confluent-ldap delete secret -l app.kubernetes.io/instance=confluent
kubectl -n confluent-ldap delete pvc -l app.kubernetes.io/instance=confluent
kubectl delete -f namespace.yaml
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l app.kubernetes.io/name=kafka-cli -o name) bash
kafka
super userexport KAFKA_USERNAME=kafka
export KAFKA_PASSWORD='my_kafka_password'
cat > kafka.config << EOF
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="${KAFKA_USERNAME}" \
password="${KAFKA_PASSWORD}";
EOF
kafka
super userkafka-topics --command-config kafka.config --bootstrap-server kafka:9092 --list
This command will lis tall topics.
app_geysers
userexport APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
cat > app.config << EOF
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="${APP_USERNAME}" \
password="${APP_PASSWORD}";
EOF
app_geysers
userkafka-topics --command-config app.config --bootstrap-server kafka:9092 --list
This is a valid user, but has no permissions. Therefore no topics are listed.
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l app.kubernetes.io/name=confluent-cli -o name) bash
kafka
confluent login
apk add jq
export CLUSTER_ID="$(confluent cluster describe -o json | jq -r .crn)"
team_enceladus
confluent iam rbac role-binding create \
--principal Group:team_enceladus \
--role DeveloperManage \
--resource Topic:enceladus_ \
--prefix \
--kafka-cluster-id $CLUSTER_ID
confluent iam rbac role-binding create \
--principal Group:team_enceladus \
--role DeveloperWrite \
--resource Topic:enceladus_ \
--prefix \
--kafka-cluster-id $CLUSTER_ID
confluent iam rbac role-binding create \
--principal Group:team_enceladus \
--role DeveloperRead \
--resource Topic:enceladus_ \
--prefix \
--kafka-cluster-id $CLUSTER_ID
confluent iam rbac role-binding create \
--principal Group:team_enceladus \
--role DeveloperManage \
--resource Group:enceladus_ \
--prefix \
--kafka-cluster-id $CLUSTER_ID
confluent iam rbac role-binding create \
--principal Group:team_enceladus \
--role DeveloperRead \
--resource Group:enceladus_ \
--prefix \
--kafka-cluster-id $CLUSTER_ID
confluent iam rbac role-binding create \
--principal Group:team_enceladus \
--role DeveloperWrite \
--resource Group:enceladus_ \
--prefix \
--kafka-cluster-id $CLUSTER_ID
confluent iam rbac role-binding list --kafka-cluster-id $CLUSTER_ID --principal Group:team_enceladus
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l app.kubernetes.io/name=kafka-cli -o name) bash
app_geysers
userexport APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
cat > app.config << EOF
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="${APP_USERNAME}" \
password="${APP_PASSWORD}";
EOF
app_geysers
userkafka-topics --command-config app.config --bootstrap-server kafka:9092 --list
This is a valid user, but has only permissions for topics prefixed with enceladus_
.
enceladus_app1
kafka-topics --command-config app.config --bootstrap-server kafka:9092 \
--create --topic enceladus_app1 --replication-factor 3 --partitions 3
europa_app1
kafka-topics --command-config app.config --bootstrap-server kafka:9092 \
--create --topic europa_app1 --replication-factor 3 --partitions 3
The user app_geysers
was only able to create the topic with the name enceladus_app1
.
enceladus_app1
echo "test_message" | kafka-console-producer \
--broker-list kafka:9092 \
--topic enceladus_app1 \
--producer.config app.config \
--property parse.key=false
enceladus_app1
with consumer group enceladus_app1_cg
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic enceladus_app1 \
--group enceladus_app1_cg \
--consumer.config app.config \
--from-beginning \
--property parse.key=false \
--max-messages 1
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l app.kubernetes.io/name=kafka-cli -o name) bash
app_geysers
userexport APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
cat > app.config << EOF
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="${APP_USERNAME}" \
password="${APP_PASSWORD}";
EOF
enceladus_app2_avro
kafka-topics --command-config app.config --bootstrap-server kafka:9092 \
--create --topic enceladus_app2_avro --replication-factor 3 --partitions 3
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l app.kubernetes.io/name=kafkasr-cli -o name) bash
curl http://schemaregistry/subjects
This request should fail with error Unauthorized
.
export APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
curl -u ${APP_USERNAME}:${APP_PASSWORD} http://schemaregistry/subjects
The user should be able to authenticate and an empty list should be returned.
app_geysers
userexport APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
cat > app.config << EOF
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="${APP_USERNAME}" \
password="${APP_PASSWORD}";
EOF
kafka-avro-console-producer
to register a new schema and publish messages with app_geysers
userecho "A:{\"name\":\"Han Solo\",\"age\":30}" |
kafka-avro-console-producer \
--bootstrap-server kafka:9092 \
--producer.config app.config \
--topic enceladus_app2_avro \
--sync \
--request-required-acks -1 \
--message-send-max-retries 3 \
--property schema.registry.url=http://schemaregistry \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=${APP_USERNAME}:${APP_PASSWORD} \
--property parse.key=true \
--property key.separator=: \
--property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
--property value.schema="{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
This request should fail with error message: User is denied operation Write on Subject: enceladus_app2_avro-value
Before we can run this command, the user must have write permissions to the corresponding schema registry subject. Therefore, lets first create the required role bindings.
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l app.kubernetes.io/name=confluent-cli -o name) bash
kafka
confluent login
apk add jq
export CLUSTER_ID="$(confluent cluster describe -o json | jq -r .crn)"
export SCHEMA_REGISTRY_CLUSTER_ID="schemaregistry" # the schema-registry-group-id of the cluster
team_enceladus
confluent iam rbac role-binding create \
--principal Group:team_enceladus \
--role DeveloperManage \
--resource Subject:enceladus_ \
--prefix \
--kafka-cluster-id ${CLUSTER_ID} \
--schema-registry-cluster-id ${SCHEMA_REGISTRY_CLUSTER_ID}
confluent iam rbac role-binding create \
--principal Group:team_enceladus \
--role DeveloperWrite \
--resource Subject:enceladus_ \
--prefix \
--kafka-cluster-id ${CLUSTER_ID} \
--schema-registry-cluster-id ${SCHEMA_REGISTRY_CLUSTER_ID}
confluent iam rbac role-binding create \
--principal Group:team_enceladus \
--role DeveloperRead \
--resource Subject:enceladus_ \
--prefix \
--kafka-cluster-id ${CLUSTER_ID} \
--schema-registry-cluster-id ${SCHEMA_REGISTRY_CLUSTER_ID}
confluent iam rbac role-binding list \
--principal Group:team_enceladus \
--kafka-cluster-id ${CLUSTER_ID} \
--schema-registry-cluster-id ${SCHEMA_REGISTRY_CLUSTER_ID}
For more information about role bindings for schema registry see: https://docs.confluent.io/platform/current/schema-registry/security/rbac-schema-registry.html
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l app.kubernetes.io/name=kafkasr-cli -o name) bash
app_geysers
userexport APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
cat > app.config << EOF
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="${APP_USERNAME}" \
password="${APP_PASSWORD}";
EOF
kafka-avro-console-producer
to register a new schema and publish messages with app_geysers
userecho "A:{\"name\":\"Han Solo\",\"age\":30}" |
kafka-avro-console-producer \
--bootstrap-server kafka:9092 \
--producer.config app.config \
--topic enceladus_app2_avro \
--sync \
--request-required-acks -1 \
--message-send-max-retries 3 \
--property schema.registry.url=http://schemaregistry \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=${APP_USERNAME}:${APP_PASSWORD} \
--property parse.key=true \
--property key.separator=: \
--property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
--property value.schema="{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
Now the user is authorized to register a schema for the corresponding topic.
kafka-avro-console-consumer
to consume the publish messages with app_geysers
userkafka-avro-console-consumer \
--bootstrap-server kafka:9092 \
--consumer.config app.config \
--topic enceladus_app2_avro \
--group enceladus_app1_cg \
--from-beginning \
--max-messages 1 \
--property schema.registry.url=http://schemaregistry \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=${APP_USERNAME}:${APP_PASSWORD} \
--property parse.key=true \
--property key.separator=: \
--property key.serializer=org.apache.kafka.common.serialization.StringSerializer
export APP_USERNAME=app_geysers
export APP_PASSWORD='my_app_password'
curl -u ${APP_USERNAME}:${APP_PASSWORD} http://schemaregistry/subjects
curl -u ${APP_USERNAME}:${APP_PASSWORD} http://schemaregistry/subjects/enceladus_app2_avro-value/versions/1/schema
Navigate to http://localhost:9021
Now login with user app_geysers
. This should work and you should be able to see all rescources like topics and schemas for which corresponding role bindinges exists for its group.
Next, try login with user kafka
, which is a Kafka super user. This has been defined by starting Kafka with the following environment variable.
env:
- name: KAFKA_SUPER_USERS
value: User:admin;User:kafka;User:kafkarest;User:schemaregistry;User:controlcenter;User:ANONYMOUS
However, you will recognize that you have not even the permission to see the Kafka cluster itself. The reason for this is, that Control Center soley is based on RBAC for access controll. Therefore, in order to be able to manage all resources via Control Center, you must create the corresponding role bindings.
kubectl -n confluent-ldap exec -it $(kubectl -n confluent-ldap get pods -l app.kubernetes.io/name=confluent-cli -o name) bash
kafka
confluent login
apk add jq
export CLUSTER_ID="$(confluent cluster describe -o json | jq -r .crn)"
export SCHEMA_REGISTRY_CLUSTER_ID="schemaregistry" # the schema-registry-group-id of the cluster
kafka
confluent iam rbac role-binding create \
--principal User:kafka \
--role SystemAdmin \
--kafka-cluster-id ${CLUSTER_ID}
confluent iam rbac role-binding create \
--principal User:kafka \
--role SystemAdmin \
--kafka-cluster-id ${CLUSTER_ID} \
--schema-registry-cluster-id ${SCHEMA_REGISTRY_CLUSTER_ID}
Navigate again to http://localhost:9021 and login with user kafka
. Now you are able to view an manage any resource which exists in the Confluent Platform cluster.