Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the default number of consumers to the number of CPUs in the system #185

Merged
merged 5 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pip install fink-client --upgrade
You will also need to install `fastavro==1.6.0` separately (versions above are not compatible with alert schema):

```
# fastavro 1.6.1 requires Cython<3
# fastavro 1.6.0 requires Cython<3
pip install "Cython<3"
pip install --no-build-isolation "fastavro==1.6.0"
```
Expand Down Expand Up @@ -108,7 +108,8 @@ More information at [docs/livestream](https://fink-broker.readthedocs.io/en/late
If you requested data using the [Data Transfer service](https://fink-portal.org/download), you can easily poll your stream using:

```bash
usage: fink_datatransfer [-h] [-topic TOPIC] [-limit LIMIT] [-outdir OUTDIR] [-partitionby PARTITIONBY] [-batchsize BATCHSIZE] [-nconsumers NCONSUMERS] [-maxtimeout MAXTIMEOUT] [--restart_from_beginning] [--verbose]
usage: fink_datatransfer.py [-h] [-topic TOPIC] [-limit LIMIT] [-outdir OUTDIR] [-partitionby PARTITIONBY] [-batchsize BATCHSIZE] [-nconsumers NCONSUMERS]
[-maxtimeout MAXTIMEOUT] [-number_partitions NUMBER_PARTITIONS] [--restart_from_beginning] [--verbose]

Kafka consumer to listen and archive Fink streams from the data transfer service

Expand All @@ -118,16 +119,19 @@ optional arguments:
-limit LIMIT If specified, download only `limit` alerts from the stream. Default is None, that is download all alerts.
-outdir OUTDIR Folder to store incoming alerts. It will be created if it does not exist.
-partitionby PARTITIONBY
Partition data by `time` (year=YYYY/month=MM/day=DD), or `finkclass` (finkclass=CLASS), or `tnsclass` (tnsclass=CLASS). `classId` is also available for ELASTiCC data. Default is time.
Partition data by `time` (year=YYYY/month=MM/day=DD), or `finkclass` (finkclass=CLASS), or `tnsclass` (tnsclass=CLASS). `classId` is
also available for ELASTiCC data. Default is time.
-batchsize BATCHSIZE Maximum number of alert within the `maxtimeout` (see conf). Default is 1000 alerts.
-nconsumers NCONSUMERS
Number of parallel consumer to use. Default is 1.
Number of parallel consumer to use. Default (-1) is the number of logical CPUs in the system.
-maxtimeout MAXTIMEOUT
Overwrite the default timeout (in seconds) from user configuration. Default is None.
-number_partitions NUMBER_PARTITIONS
Number of partitions for the topic in the distant Kafka cluster. Do not touch unless you know what your are doing. Default is 10
(Fink Kafka cluster)
--restart_from_beginning
If specified, restart downloading from the 1st alert in the stream. Default is False.
--verbose If specified, print on screen information about the consuming.

```

More information at [docs/datatransfer](https://fink-broker.readthedocs.io/en/latest/services/data_transfer/).
36 changes: 23 additions & 13 deletions fink_client/scripts/fink_datatransfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import argparse
import logging
import psutil

from tqdm import trange

Expand Down Expand Up @@ -245,13 +246,15 @@ def return_last_offsets(kafka_config, topic):
return offsets


def poll(processId, queue, schema, kafka_config, args):
def poll(process_id, nconsumers, queue, schema, kafka_config, args):
""" Poll data from Kafka servers

Parameters
----------
processId: int
process_id: int
ID of the process used for multiprocessing
nconsumers: int
Number of consumers (cores) in parallel
queue: Multiprocessing.Queue
Shared queue between processes where are stocked partitions
and the last offset of the partition
Expand All @@ -269,7 +272,7 @@ def poll(processId, queue, schema, kafka_config, args):
# topics = ['{}'.format(args.topic)]

# infinite loop
maxpoll = int(args.limit / args.nconsumers) if args.limit is not None else 1e10
maxpoll = int(args.limit / nconsumers) if args.limit is not None else 1e10
disable = not args.verbose

poll_number = 0
Expand Down Expand Up @@ -315,7 +318,7 @@ def poll(processId, queue, schema, kafka_config, args):
# Decode the message
if msgs is not None:
if len(msgs) == 0:
print('[{}] No alerts the last {} seconds ({} polled)... Have to exit(1)\n'.format(processId, args.maxtimeout, poll_number))
print('[{}] No alerts the last {} seconds ({} polled)... Have to exit(1)\n'.format(process_id, args.maxtimeout, poll_number))
# Alerts can be added in the partition later
# putting it again in the queue
# changing the offset to continue where we stopped
Expand All @@ -330,7 +333,7 @@ def poll(processId, queue, schema, kafka_config, args):
[fastavro.schemaless_reader(io.BytesIO(msg.value()), schema) for msg in msgs],
)
if pdf.empty:
# print('[{}] No alerts the last {} seconds ({} polled)... Exiting\n'.format(processId, args.maxtimeout, poll_number))
# print('[{}] No alerts the last {} seconds ({} polled)... Exiting\n'.format(process_id, args.maxtimeout, poll_number))
break

# known mismatches between partitions
Expand Down Expand Up @@ -366,7 +369,7 @@ def poll(processId, queue, schema, kafka_config, args):
table,
args.outdir,
schema=table_schema,
basename_template='part-{}-{{i}}-{}.parquet'.format(processId, poll_number),
basename_template='part-{}-{{i}}-{}.parquet'.format(process_id, poll_number),
partition_cols=partitioning,
existing_data_behavior='overwrite_or_ignore'
)
Expand All @@ -377,7 +380,7 @@ def poll(processId, queue, schema, kafka_config, args):
table,
args.outdir,
schema=table_schema_,
basename_template='part-{}-{{i}}-{}.parquet'.format(processId, poll_number),
basename_template='part-{}-{{i}}-{}.parquet'.format(process_id, poll_number),
partition_cols=partitioning,
existing_data_behavior='overwrite_or_ignore'
)
Expand All @@ -393,7 +396,7 @@ def poll(processId, queue, schema, kafka_config, args):
})
break
else:
logging.info('[{}] No alerts the last {} seconds ({} polled)\n'.format(processId, args.maxtimeout, poll_number))
logging.info('[{}] No alerts the last {} seconds ({} polled)\n'.format(process_id, args.maxtimeout, poll_number))
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
consumer.close()
Expand All @@ -419,11 +422,14 @@ def main():
'-batchsize', type=int, default=1000,
help="Maximum number of alert within the `maxtimeout` (see conf). Default is 1000 alerts.")
parser.add_argument(
'-nconsumers', type=int, default=1,
help="Number of parallel consumer to use. Default is 1.")
'-nconsumers', type=int, default=-1,
help="Number of parallel consumer to use. Default (-1) is the number of logical CPUs in the system.")
parser.add_argument(
'-maxtimeout', type=float, default=None,
help="Overwrite the default timeout (in seconds) from user configuration. Default is None.")
parser.add_argument(
'-number_partitions', type=int, default=10,
help="Number of partitions for the topic in the distant Kafka cluster. Do not touch unless you know what your are doing. Default is 10 (Fink Kafka cluster)")
parser.add_argument(
'--restart_from_beginning', action='store_true',
help="If specified, restart downloading from the 1st alert in the stream. Default is False.")
Expand All @@ -443,6 +449,10 @@ def main():
if args.maxtimeout is None:
args.maxtimeout = conf['maxtimeout']

# Number of consumers to use
if args.nconsumers == -1:
nconsumers = psutil.cpu_count(logical=True)

kafka_config = {
'bootstrap.servers': conf['servers'],
'group.id': conf['group_id'],
Expand All @@ -453,7 +463,7 @@ def main():
total_lag, total_offset = print_offsets(kafka_config, args.topic, args.maxtimeout, verbose=False)
args.total_lag = total_offset
args.total_offset = 0
offsets = [0 for _ in range(10)]
offsets = [0 for _ in range(args.number_partitions)]
else:
total_lag, total_offset = print_offsets(kafka_config, args.topic, args.maxtimeout)
args.total_lag = total_lag
Expand Down Expand Up @@ -487,8 +497,8 @@ def main():

# Processes Creation
procs = []
for i in range(args.nconsumers):
proc = Process(target=poll, args=(i, available, schema, kafka_config, args))
for i in range(nconsumers):
proc = Process(target=poll, args=(i, nconsumers, available, schema, kafka_config, args))
procs.append(proc)
proc.start()

Expand Down