Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dependency to Zookeeper when possible #63

Closed
manfredpaul opened this issue Jul 29, 2020 · 4 comments · Fixed by #79
Closed

Remove dependency to Zookeeper when possible #63

manfredpaul opened this issue Jul 29, 2020 · 4 comments · Fixed by #79

Comments

@manfredpaul
Copy link

Expected Behavior

Usage of KafkaAdminClient when possible instead of accessing Zookeeper nodes directly

Actual Behavior

Need of a Zookeeper connection. We have a Zookeeper with Kerberos which is not supported yet. (And maybe should not if the Kafka Admin Client could make life easier)

Play to Reproduce the Problem

Run create topic code without specifing a ZooKeeper connection

Logs from the play with Ansible in debug mode

fatal: [localhost]: FAILED! => {"changed": false, "msg": "'zookeeper', 'partitions' and 'replica_factor' parameters are needed when parameter 'state' is 'present'"}

Specifications

  • Library version: master
  • Result of pip list command:

ansible (2.4.3.0)
ansible-lint (4.1.0)
anyconfig (0.9.7)
arrow (0.15.5)
atomicwrites (1.3.0)
attrs (19.3.0)
backports.functools-lru-cache (1.6.1)
backports.ssl-match-hostname (3.7.0.1)
bcrypt (3.1.7)
binaryornot (0.4.4)
Cerberus (1.2)
certifi (2019.11.28)
cffi (1.11.1)
chardet (3.0.4)
click (6.7)
click-completion (0.3.1)
colorama (0.3.9)
configparser (4.0.2)
contextlib2 (0.6.0.post1)
cookiecutter (1.6.0)
cryptography (1.4)
distro (1.4.0)
dnspython (1.15.0)
docker-py (1.10.6)
docker-pycreds (0.4.0)
entrypoints (0.3)
enum34 (1.1.6)
fasteners (0.15)
flake8 (3.7.9)
funcsigs (1.0.2)
functools32 (3.2.3.post2)
future (0.18.2)
git-url-parse (1.2.2)
gitdb2 (2.0.6)
GitPython (2.1.14)
httplib2 (0.10.3)
idna (2.7)
importlib (1.0.3)
importlib-metadata (1.4.0)
ipaddress (1.0.22)
Jinja2 (2.10)
jinja2-time (0.2.0)
kafka-python (1.4.4)
kazoo (2.6.1)
MarkupSafe (1.0)
mccabe (0.6.1)
molecule (2.20.0)
monotonic (1.5)
more-itertools (5.0.0)
packaging (20.0)
paramiko (2.1.2)
pathlib2 (2.3.5)
pathspec (0.7.0)
pbr (5.1.1)
pexpect (4.6.0)
pip (9.0.1)
pluggy (0.13.1)
poyo (0.5.0)
psutil (5.4.6)
ptyprocess (0.6.0)
pure-sasl (0.5.1)
py (1.8.1)
pyasn1 (0.4.4)
pycodestyle (2.5.0)
pycparser (2.18)
pycryptodomex (3.4.3)
pyflakes (2.1.1)
PyNaCl (1.3.0)
pyOpenSSL (17.2.0)
pyparsing (2.4.6)
pytest (4.6.9)
pytest-flake8 (1.0.4)
python-ambariclient (0.6.0)
python-dateutil (2.8.1)
python-gilt (1.2.1)
PyYAML (3.12)
requests (2.10.0)
rpm (0.0.2)
ruamel.ordereddict (0.4.14)
ruamel.yaml (0.16.6)
ruamel.yaml.clib (0.2.0)
scandir (1.10.0)
scp (0.10.2)
selinux (0.2.1)
setuptools (44.0.0)
sh (1.12.14)
six (1.11.0)
smmap2 (2.0.5)
tabulate (0.8.2)
testinfra (1.19.0)
tree-format (0.1.2)
typing (3.7.4.1)
urllib3 (1.25.7)
wcwidth (0.1.8)
websocket-client (0.57.0)
wheel (0.29.0)
whichcraft (0.6.1)
yamllint (1.20.0)

  • Kafka version: 2.5.0
  • Python version: 2.7.5
  • OS: Centos 7
@StephenSorriaux
Copy link
Owner

Hello,

Thanks for this issue.

I actually did my best to avoid talking with ZK as much as possible (that's why the lib doesnt support old broker version) but there are still some tasks that requires a ZK connection:

  • get the current topic configuration when updating a topic that already exists: there is no API on Kafka side to get easily this information (that's why the scripts added along with Kafka are requiring a ZK connection);
  • reassigning partitions (by adding new partitions) or adding replicas to a topic: there is also no API on Kafka side to do that, and the Kafka scripts also proceed in the same way.

The error you are getting is because you are trying to update a topic that already exists, for a "simple" topic creation ZK is not used. I listed the ZK kerberos support here #17 some times ago, I will see to find some time to add it to the lib.

Overall, I will be able to remove ZK from the lib once KIP-500 will be implemented and released.

@michaelandrepearce
Copy link

list_topics and describe_topics
https://github.com/dpkp/kafka-python/blob/master/kafka/admin/client.py#L516

that's been available in java admin client is now in the python kafka admin client thus not needing direct ZK access.

@StephenSorriaux
Copy link
Owner

Hello,

Yes, those 2 APIs do not need any ZK access, but they also do not help in the 2 cases I mentioned previously since they just use the Metadata request that only returns a list of topics.

@michaelandrepearce
Copy link

michaelandrepearce commented Jan 8, 2021

Hi @StephenSorriaux

Have you checked this because when i run:

    admin = KafkaAdminClient(bootstrap_servers='<<yourbrokerid>>.uksouth.azure.confluent.cloud:9092',
                             security_protocol='SASL_SSL',
                             sasl_mechanism='PLAIN',
                             sasl_plain_username='replace_with_your_user',
                             sasl_plain_password='replace_with_your_password'
    )
    print(admin.list_topics())
    print(admin.describe_topics(admin.list_topics()))
['TestTopic']

[{'error_code': 0, 'topic': 'TestTopic', 'is_internal': False, 'partitions': [{'error_code': 0, 'partition': 0, 'leader': 2, 'replicas': [2, 1, 0], 'isr': [2, 1, 0], 'offline_replicas': []}, {'error_code': 0, 'partition': 1, 'leader': 7, 'replicas': [7, 6, 8], 'isr': [7, 6, 8], 'offline_replicas': []}, {'error_code': 0, 'partition': 2, 'leader': 3, 'replicas': [3, 2, 7], 'isr': [3, 2, 7], 'offline_replicas': []}]}]

So describe seems to be returning for enough information for at least topic updating to understand the current number of partitions, by simply getting the size of the partitions array/list. Then using admin.create_partitions the partition count can be altered by admin client e.g.

        admin.create_partitions({
            "TestTopic", NewPartitions(5)
        })

Like wise to get further config details of topic configuration, can use the describe_configs method (note this is useful for Topic and Cluster config access)

    resource = ConfigResource(resource_type=ConfigResourceType.TOPIC, name='TestTopic')
    config_resources = [resource]
    print(admin.describe_configs(config_resources))

which gives

[DescribeConfigsResponse_v2(throttle_time_ms=0, resources=[(error_code=0, error_message='', resource_type=2, resource_name='TestTopic', config_entries=[(config_names='compression.type', config_value='producer', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='leader.replication.throttled.replicas', config_value='', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='message.downconversion.enable', config_value='true', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='min.insync.replicas', config_value='2', read_only=False, config_source=4, is_sensitive=False, config_synonyms=[]), (config_names='segment.jitter.ms', config_value='0', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='cleanup.policy', config_value='delete', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='flush.ms', config_value='9223372036854775807', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='follower.replication.throttled.replicas', config_value='', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='segment.bytes', config_value='104857600', read_only=False, config_source=4, is_sensitive=False, config_synonyms=[]), (config_names='retention.ms', config_value='604800000', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='flush.messages', config_value='9223372036854775807', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='message.format.version', config_value='2.3-IV1', read_only=True, config_source=4, is_sensitive=False, config_synonyms=[]), (config_names='max.compaction.lag.ms', config_value='9223372036854775807', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='file.delete.delay.ms', config_value='60000', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='max.message.bytes', config_value='2097164', read_only=False, config_source=4, is_sensitive=False, config_synonyms=[]), (config_names='min.compaction.lag.ms', config_value='0', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='message.timestamp.type', config_value='CreateTime', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='preallocate', config_value='false', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='min.cleanable.dirty.ratio', config_value='0.5', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='index.interval.bytes', config_value='4096', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='unclean.leader.election.enable', config_value='false', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='retention.bytes', config_value='-1', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='delete.retention.ms', config_value='86400000', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='segment.ms', config_value='604800000', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='message.timestamp.difference.max.ms', config_value='9223372036854775807', read_only=False, config_source=5, is_sensitive=False, config_synonyms=[]), (config_names='segment.index.bytes', config_value='10485760', read_only=True, config_source=5, is_sensitive=False, config_synonyms=[])])])]

The need to for clients to connect to ZK was removed in Kafka 2.2.0 (e.g. the ZK approach was depreciated), making way for the work in KIP 500, so anyone on Kafka 2.2.0 no longer needs to connect to ZK for administrative tasks, and ideally should transition from not using them, all the cli tools were also updated at this time.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-377%3A+TopicCommand+to+use+AdminClient

There is a nice write up about it here:
https://www.confluent.io/blog/how-to-prepare-for-kip-500-kafka-zookeeper-removal-guide/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants