From 58985b738c28ce8c9bb58343620f15092afb0624 Mon Sep 17 00:00:00 2001 From: Julien Date: Thu, 14 Sep 2023 14:00:31 +0200 Subject: [PATCH] Add random number to the partitioned file name (#187) * Add random number to the partitioned file name * Bump to 7.1 --- fink_client/__init__.py | 2 +- fink_client/scripts/fink_datatransfer.py | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/fink_client/__init__.py b/fink_client/__init__.py index 3f5178b..8c7b140 100644 --- a/fink_client/__init__.py +++ b/fink_client/__init__.py @@ -12,5 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "7.0" +__version__ = "7.1" __schema_version__ = "distribution_schema_fink_ztf_{}.avsc" diff --git a/fink_client/scripts/fink_datatransfer.py b/fink_client/scripts/fink_datatransfer.py index 3a2bd75..1121482 100755 --- a/fink_client/scripts/fink_datatransfer.py +++ b/fink_client/scripts/fink_datatransfer.py @@ -32,6 +32,7 @@ import confluent_kafka import pandas as pd +import numpy as np from multiprocessing import Process, Queue @@ -246,7 +247,7 @@ def return_last_offsets(kafka_config, topic): return offsets -def poll(process_id, nconsumers, queue, schema, kafka_config, args): +def poll(process_id, nconsumers, queue, schema, kafka_config, rng, args): """ Poll data from Kafka servers Parameters @@ -364,12 +365,13 @@ def poll(process_id, nconsumers, queue, schema, kafka_config, args): elif args.partitionby == 'classId': partitioning = ['classId'] + part_num = rng.randint(0, 1e6) try: pq.write_to_dataset( table, args.outdir, schema=table_schema, - basename_template='part-{}-{{i}}-{}.parquet'.format(process_id, poll_number), + basename_template='part-{}-{{i}}-{}.parquet'.format(process_id, part_num), partition_cols=partitioning, existing_data_behavior='overwrite_or_ignore' ) @@ -380,7 +382,7 @@ def poll(process_id, nconsumers, queue, schema, kafka_config, args): table, args.outdir, schema=table_schema_, - basename_template='part-{}-{{i}}-{}.parquet'.format(process_id, poll_number), + basename_template='part-{}-{{i}}-{}.parquet'.format(process_id, part_num), partition_cols=partitioning, existing_data_behavior='overwrite_or_ignore' ) @@ -452,6 +454,8 @@ def main(): # Number of consumers to use if args.nconsumers == -1: nconsumers = psutil.cpu_count(logical=True) + else: + nconsumers = args.nconsumers kafka_config = { 'bootstrap.servers': conf['servers'], @@ -496,9 +500,11 @@ def main(): }) # Processes Creation + random_state = 0 + rng = np.random.RandomState(random_state) procs = [] for i in range(nconsumers): - proc = Process(target=poll, args=(i, nconsumers, available, schema, kafka_config, args)) + proc = Process(target=poll, args=(i, nconsumers, available, schema, kafka_config, rng, args)) procs.append(proc) proc.start()