diff --git a/fink_client/scripts/fink_datatransfer.py b/fink_client/scripts/fink_datatransfer.py index ea026a6..d4fc33f 100755 --- a/fink_client/scripts/fink_datatransfer.py +++ b/fink_client/scripts/fink_datatransfer.py @@ -120,6 +120,14 @@ def main(): print('No alerts the last {} seconds ({} polled)... Exiting'.format(maxtimeout, poll_number)) break + # known mismatches between partitions + # see https://github.com/astrolabsoftware/fink-client/issues/165 + if 'cats_broad_max_prob' in pdf.columns: + pdf['cats_broad_max_prob'] = pdf['cats_broad_max_prob'].astype('float') + + if 'cats_broad_class' in pdf.columns: + pdf['cats_broad_class'] = pdf['cats_broad_class'].astype('float') + if 'tracklet' in pdf.columns: pdf['tracklet'] = pdf['tracklet'].astype('str') @@ -140,14 +148,26 @@ def main(): elif args.partitionby == 'classId': partitioning = ['classId'] - pq.write_to_dataset( - table, - args.outdir, - schema=table_schema, - basename_template='part-{{i}}-{}.parquet'.format(poll_number), - partition_cols=partitioning, - existing_data_behavior='overwrite_or_ignore' - ) + try: + pq.write_to_dataset( + table, + args.outdir, + schema=table_schema, + basename_template='part-{{i}}-{}.parquet'.format(poll_number), + partition_cols=partitioning, + existing_data_behavior='overwrite_or_ignore' + ) + except pa.lib.ArrowTypeError: + print('Schema mismatch detected') + table_schema_ = table.schema + pq.write_to_dataset( + table, + args.outdir, + schema=table_schema_, + basename_template='part-{{i}}-{}.parquet'.format(poll_number), + partition_cols=partitioning, + existing_data_behavior='overwrite_or_ignore' + ) poll_number += len(msgs) if args.verbose: