Skip to content

Commit

Permalink
Add random number to the partitioned file name (#187)
Browse files Browse the repository at this point in the history
* Add random number to the partitioned file name

* Bump to 7.1
  • Loading branch information
JulienPeloton authored Sep 14, 2023
1 parent 311358f commit 58985b7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
2 changes: 1 addition & 1 deletion fink_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "7.0"
__version__ = "7.1"
__schema_version__ = "distribution_schema_fink_ztf_{}.avsc"
14 changes: 10 additions & 4 deletions fink_client/scripts/fink_datatransfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import confluent_kafka

import pandas as pd
import numpy as np

from multiprocessing import Process, Queue

Expand Down Expand Up @@ -246,7 +247,7 @@ def return_last_offsets(kafka_config, topic):
return offsets


def poll(process_id, nconsumers, queue, schema, kafka_config, args):
def poll(process_id, nconsumers, queue, schema, kafka_config, rng, args):
""" Poll data from Kafka servers
Parameters
Expand Down Expand Up @@ -364,12 +365,13 @@ def poll(process_id, nconsumers, queue, schema, kafka_config, args):
elif args.partitionby == 'classId':
partitioning = ['classId']

part_num = rng.randint(0, 1e6)
try:
pq.write_to_dataset(
table,
args.outdir,
schema=table_schema,
basename_template='part-{}-{{i}}-{}.parquet'.format(process_id, poll_number),
basename_template='part-{}-{{i}}-{}.parquet'.format(process_id, part_num),
partition_cols=partitioning,
existing_data_behavior='overwrite_or_ignore'
)
Expand All @@ -380,7 +382,7 @@ def poll(process_id, nconsumers, queue, schema, kafka_config, args):
table,
args.outdir,
schema=table_schema_,
basename_template='part-{}-{{i}}-{}.parquet'.format(process_id, poll_number),
basename_template='part-{}-{{i}}-{}.parquet'.format(process_id, part_num),
partition_cols=partitioning,
existing_data_behavior='overwrite_or_ignore'
)
Expand Down Expand Up @@ -452,6 +454,8 @@ def main():
# Number of consumers to use
if args.nconsumers == -1:
nconsumers = psutil.cpu_count(logical=True)
else:
nconsumers = args.nconsumers

kafka_config = {
'bootstrap.servers': conf['servers'],
Expand Down Expand Up @@ -496,9 +500,11 @@ def main():
})

# Processes Creation
random_state = 0
rng = np.random.RandomState(random_state)
procs = []
for i in range(nconsumers):
proc = Process(target=poll, args=(i, nconsumers, available, schema, kafka_config, args))
proc = Process(target=poll, args=(i, nconsumers, available, schema, kafka_config, rng, args))
procs.append(proc)
proc.start()

Expand Down

0 comments on commit 58985b7

Please sign in to comment.