Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Typo in AlertConsumer code documentation #101

Open
AlexGKim opened this issue Aug 18, 2021 · 10 comments · May be fixed by #102
Open

Typo in AlertConsumer code documentation #101

AlexGKim opened this issue Aug 18, 2021 · 10 comments · May be fixed by #102

Comments

@AlexGKim
Copy link

AlexGKim commented Aug 18, 2021

Two of the config keys given in the AlertConsumer code documentation are incorrect

username: str

username -> sasl.username
password -> sasl.password

@JulienPeloton
Copy link
Member

Thanks for spotting the mistake! I will correct it asap.

@JulienPeloton
Copy link
Member

Well actually the doc is correct -- AlertConsumer takes as input a dictionnary config whose keys are username and password among several. Then it gets translated to Kafka input parameter names in the _get_kafka_config method:

if 'username' in config and 'password' in config:
kafka_config["security.protocol"] = "sasl_plaintext"
kafka_config["sasl.mechanism"] = "SCRAM-SHA-512"
kafka_config["sasl.username"] = config["username"]
kafka_config["sasl.password"] = config["password"]

It is somehow unfortunate that the names are not exactly the same, but the reason behind is that by using username and password on a higher level, it abstracts the authentication method on the lower level (sasl now, but it could be kerberos in the future for example).

I've made the documentation more explicit, see #102

@AlexGKim
Copy link
Author

I did pass in a dictionary with "username" and "password" keys but got an error.

@JulienPeloton
Copy link
Member

This is surprising. What error exactly did you get? Could you paste the traceback here?

I just tried the following snippet with my credentials, and it worked:

from fink_client.consumer import AlertConsumer

myconfig = {
    'username': '__my_username__',
    'bootstrap.servers': '__servers__',
    'group_id': '__my_group_id__'
}

topics = ["fink_early_sn_candidates_ztf"]

# Instantiate a consumer
consumer = AlertConsumer(topics, myconfig)

# Poll the servers
topic, alert, key = consumer.poll(maxtimeout=5)
print(alert)

@AlexGKim
Copy link
Author

Traceback (most recent call last):
  File "manage.py", line 22, in <module>
    main()
  File "manage.py", line 18, in main
    execute_from_command_line(sys.argv)
  File "/Users/akim/venv/tom_env/lib/python3.8/site-packages/django/core/management/__init__.py", line 419, in execute_from_command_line
    utility.execute()
  File "/Users/akim/venv/tom_env/lib/python3.8/site-packages/django/core/management/__init__.py", line 413, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/Users/akim/venv/tom_env/lib/python3.8/site-packages/django/core/management/base.py", line 354, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/Users/akim/venv/tom_env/lib/python3.8/site-packages/django/core/management/base.py", line 398, in execute
    output = self.handle(*args, **options)
  File "/Users/akim/project/tom_desc/stream/management/commands/fink.py", line 55, in handle
    consumer = AlertConsumer(["fink_early_sn_candidates_ztf"], FINK_CONSUMER_CONFIGURATION)
  File "/Users/akim/venv/tom_env/lib/python3.8/site-packages/fink_client/consumer.py", line 55, in __init__
    self._consumer = confluent_kafka.Consumer(self._kafka_config)
cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Failed to create consumer: sasl.username and sasl.password must be set"}

@AlexGKim
Copy link
Author

FINK_CONSUMER_CONFIGURATION = {
    # 'sasl.username': os.getenv('FINK_USERNAME',''),
    # 'sasl.password': os.getenv('FINK_PASSWORD', ''),
    'username': os.getenv('FINK_USERNAME',''),
    'password': os.getenv('FINK_PASSWORD', ''),
    'group_id': os.getenv('FINK_GROUP_ID', ''),
    'bootstrap.servers': os.getenv('FINK_SERVER', ''),
    }

@JulienPeloton
Copy link
Member

The problem is the password -- if unset (i.e. None in this case), the code will fail because of this line:

if 'username' in config and 'password' in config:
kafka_config["security.protocol"] = "sasl_plaintext"
kafka_config["sasl.mechanism"] = "SCRAM-SHA-512"
kafka_config["sasl.username"] = config["username"]
kafka_config["sasl.password"] = config["password"]

I used to not pass it when unset, but I will fix the code as well such that passing None will initialise the consumer as well.

@JulienPeloton
Copy link
Member

Hum, actually there might be a breach -- let me check things on the server side...

@AlexGKim
Copy link
Author

AlexGKim commented Aug 19, 2021

You are much more computer savvy than I am. But I did notice a change as some point where
key in dictionary no longer worked, had to do key in dictionary.keys()

@JulienPeloton
Copy link
Member

@AlexGKim thanks for putting your finger on a larger problem -- I found the authentication mechanism of the cluster was not properly configured (username and password were skipped).

I will fix it, but later. Having said that, this does not prevent data to flow normally, but you will authenticate with the group_id only.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Lib
2 participants