Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new functionalities to reset offsets #194

Merged
merged 5 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,31 @@ Once you have your credentials, you are ready to poll streams! You can easily ac

```bash
fink_consumer -h
usage: fink_consumer [-h] [--display] [-limit LIMIT] [--available_topics]
[--save] [-outdir OUTDIR] [-schema SCHEMA]
[--dump_schema]
usage: fink_consumer [-h] [--display] [--display_statistics] [-limit LIMIT]
[--available_topics] [--save] [-outdir OUTDIR]
[-schema SCHEMA] [--dump_schema] [-start_at START_AT]

Kafka consumer to listen and archive Fink streams from the Livestream
service
Kafka consumer to listen and archive Fink streams from the Livestream service

optional arguments:
-h, --help show this help message and exit
--display If specified, print on screen information about
incoming alert.
-limit LIMIT If specified, download only `limit` alerts. Default is
None.
--available_topics If specified, print on screen information about
available topics.
--save If specified, save alert data on disk (Avro). See also
-outdir.
-outdir OUTDIR Folder to store incoming alerts if --save is set. It
must exist.
-schema SCHEMA Avro schema to decode the incoming alerts. Default is
None (version taken from each alert)
--dump_schema If specified, save the schema on disk (json file)
-h, --help show this help message and exit
--display If specified, print on screen information about
incoming alert.
--display_statistics If specified, print on screen information about queues,
and exit.
-limit LIMIT If specified, download only `limit` alerts. Default is
None.
--available_topics If specified, print on screen information about
available topics.
--save If specified, save alert data on disk (Avro). See also
-outdir.
-outdir OUTDIR Folder to store incoming alerts if --save is set. It
must exist.
-schema SCHEMA Avro schema to decode the incoming alerts. Default is
None (version taken from each alert)
--dump_schema If specified, save the schema on disk (json file)
-start_at START_AT If specified, reset offsets to 0 (`earliest`) or empty
queue (`latest`).
```

You can also look at an alert on the disk:
Expand Down
2 changes: 1 addition & 1 deletion fink_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "8.0"
__version__ = "8.1"
__schema_version__ = "distribution_schema_fink_ztf_{}.avsc"
6 changes: 6 additions & 0 deletions fink_client/avro_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ def write_alert(
path: str
Folder that will contain the alert. The filename will always be
<objectID>.avro
overwrite: bool, optional
If True, overwrite existing alert. Default is False.
id1: str, optional
First prefix for alert name: {id1}_{id2}.avro
id2: str, optional
Second prefix for alert name: {id1}_{id2}.avro

Examples
--------
Expand Down
245 changes: 236 additions & 9 deletions fink_client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import sys
import json
import time
import fastavro
Expand All @@ -35,7 +36,14 @@ class AlertError(Exception):
class AlertConsumer:
"""High level Kafka consumer to receive alerts from Fink broker"""

def __init__(self, topics: list, config: dict, schema_path=None, dump_schema=False):
def __init__(
self,
topics: list,
config: dict,
schema_path=None,
dump_schema=False,
on_assign=None,
):
"""Creates an instance of `AlertConsumer`

Parameters
Expand All @@ -52,12 +60,27 @@ def __init__(self, topics: list, config: dict, schema_path=None, dump_schema=Fal
group.id for Kafka consumer
bootstrap.servers: str, optional
Kafka servers to connect to
schema_path: str, optional
If specified, path to an alert schema (avsc).
Default is None.
dump_schema: bool, optional
If True, save incoming alert schema on disk.
Useful for schema inspection when getting `IndexError`.
Default is False.
on_assign: callable, optional
Callback to update the current assignment
and specify start offsets. Default is None.

"""
self._topics = topics
self._kafka_config = _get_kafka_config(config)
self.schema_path = schema_path
self._consumer = confluent_kafka.Consumer(self._kafka_config)
self._consumer.subscribe(self._topics)

if on_assign is not None:
self._consumer.subscribe(self._topics, on_assign=on_assign)
else:
self._consumer.subscribe(self._topics)
self.dump_schema = dump_schema

def __enter__(self):
Expand Down Expand Up @@ -281,7 +304,9 @@ def close(self):
self._consumer.close()


def return_offsets(consumer, topic, waitfor=1, timeout=10, verbose=False):
def return_offsets(
consumer, topic, waitfor=1, timeout=10, hide_empty_partition=True, verbose=False
):
"""Poll servers to get the total committed offsets, and remaining lag

Parameters
Expand All @@ -294,6 +319,9 @@ def return_offsets(consumer, topic, waitfor=1, timeout=10, verbose=False):
Time in second to wait before polling. Default is 1 second.
timeout: int, optional
Timeout in second when polling the servers. Default is 10.
hide_empty_partition: bool, optional
If True, display only non-empty partitions.
Default is True
verbose: bool, optional
If True, prints useful table. Default is False.

Expand Down Expand Up @@ -357,18 +385,117 @@ def return_offsets(consumer, topic, waitfor=1, timeout=10, verbose=False):
total_lag = total_lag + int(lag)

if verbose:
print(
"%-50s %9s %9s"
% ("{} [{}]".format(partition.topic, partition.partition), offset, lag)
)
if (hide_empty_partition and offset != "-") or (not hide_empty_partition):
print(
"%-50s %9s %9s"
% (
"{} [{}]".format(partition.topic, partition.partition),
offset,
lag,
)
)
if verbose:
print("-" * 72)
print("%-50s %9s %9s" % ("Total", total_offsets, total_lag))
print(
"%-50s %9s %9s" % ("Total for {}".format(topic), total_offsets, total_lag)
)
print("-" * 72)

return total_offsets, total_lag


def return_last_offsets(kafka_config, topic):
"""Return the last offsets

Parameters
----------
kafka_config: dict
Kafka consumer config
topic: str
Topic name

Returns
-------
offsets: list
Last offsets of each partition
"""
consumer = confluent_kafka.Consumer(kafka_config)
topics = ["{}".format(topic)]
consumer.subscribe(topics)

metadata = consumer.list_topics(topic)
if metadata.topics[topic].error is not None:
raise confluent_kafka.KafkaException(metadata.topics[topic].error)

# List of partitions
partitions = [
confluent_kafka.TopicPartition(topic, p)
for p in metadata.topics[topic].partitions
]
committed = consumer.committed(partitions)
offsets = []
for partition in committed:
if partition.offset != confluent_kafka.OFFSET_INVALID:
offsets.append(partition.offset)
else:
offsets.append(0)

consumer.close()
return offsets


def print_offsets(
kafka_config, topic, maxtimeout=10, hide_empty_partition=True, verbose=True
):
"""Wrapper around `consumer.return_offsets`

If the server is rebalancing the offsets, it will exit the program.

Parameters
----------
kafka_config: dic
Dictionary with consumer parameters
topic: str
Topic name
maxtimeout: int, optional
Timeout in second, when polling the servers
hide_empty_partition: bool, optional
If True, display only non-empty partitions.
Default is True
verbose: bool, optional
If True, prints useful table. Default is True.

Returns
-------
total_offsets: int
Total number of messages committed across all partitions
total_lag: int
Remaining messages in the topic across all partitions.
"""
consumer = confluent_kafka.Consumer(kafka_config)

topics = ["{}".format(topic)]
consumer.subscribe(topics)
total_offset, total_lag = return_offsets(
consumer,
topic,
timeout=maxtimeout,
waitfor=0,
verbose=verbose,
hide_empty_partition=hide_empty_partition,
)
if (total_offset, total_lag) == (-1, -1):
print(
"Warning: Consumer group '{}' is rebalancing. Please wait.".format(
kafka_config["group.id"]
)
)
sys.exit()
consumer.close()

return total_lag, total_offset


def _get_kafka_config(config: dict) -> dict:
"""Returns configurations for a consumer instance

Expand All @@ -392,7 +519,7 @@ def _get_kafka_config(config: dict) -> dict:
kafka_config["sasl.username"] = config["username"]
kafka_config["sasl.password"] = config["password"]

kafka_config["group.id"] = config["group_id"]
kafka_config["group.id"] = config["group.id"]

kafka_config.update(default_config)

Expand All @@ -405,3 +532,103 @@ def _get_kafka_config(config: dict) -> dict:
kafka_config["bootstrap.servers"] = "{}".format(",".join(fink_servers))

return kafka_config


def return_npartitions(topic, kafka_config):
"""Get the number of partitions

Parameters
----------
kafka_config: dic
Dictionary with consumer parameters
topic: str
Topic name

Returns
-------
nbpartitions: int
Number of partitions in the topic

"""
consumer = confluent_kafka.Consumer(kafka_config)

# Details to get
nbpartitions = 0
try:
# Topic metadata
metadata = consumer.list_topics(topic=topic)

if metadata.topics and topic in metadata.topics:
partitions = metadata.topics[topic].partitions
nbpartitions = len(partitions)
else:
print("The topic {} does not exist".format(topic))

except confluent_kafka.KafkaException as e:
print(f"Error while getting the number of partitions: {e}")

consumer.close()

return nbpartitions


def return_partition_offset(consumer, topic, partition):
"""Return the offset and the remaining lag of a partition

consumer: confluent_kafka.Consumer
Kafka consumer
topic: str
Topic name
partition: int
The partition number

Returns
-------
offset : int
Total number of offsets in the topic
"""
topicPartition = confluent_kafka.TopicPartition(topic, partition)
low_offset, high_offset = consumer.get_watermark_offsets(topicPartition)
partition_size = high_offset - low_offset

return partition_size


def get_schema_from_stream(kafka_config, topic, maxtimeout):
"""Poll the schema data from the schema topic

Parameters
----------
kafka_config: dic
Dictionary with consumer parameters
topic: str
Topic name
timeout: int, optional
Timeout in second, when polling the servers

Returns
-------
schema: None or dic
Schema data. None if the poll was not successful.
Reasons to get None:
1. timeout has been reached (increase timeout)
2. topic is empty (produce new data)
3. topic does not exist (create the topic)
"""
# Instantiate a consumer
consumer_schema = confluent_kafka.Consumer(kafka_config)

# Subscribe to schema topic
topics = ["{}_schema".format(topic)]
consumer_schema.subscribe(topics)

# Poll
msg = consumer_schema.poll(maxtimeout)
if msg is not None:
schema = fastavro.schema.parse_schema(json.loads(msg.key()))
else:
schema = None

consumer_schema.close()

return schema
Loading
Loading