From 02d9069faf9b6068cf71644177e39d5d38ea56e0 Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Sat, 23 Dec 2023 21:17:18 +0100 Subject: [PATCH 1/5] Change rowkey format for SSO resolver --- bin/index_sso_resolver.py | 61 ++++++++++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/bin/index_sso_resolver.py b/bin/index_sso_resolver.py index d0c18860..bc5d61c1 100644 --- a/bin/index_sso_resolver.py +++ b/bin/index_sso_resolver.py @@ -60,16 +60,46 @@ def mark_as_duplicate(series, count=0): """ if count == 0: # initialise markers - series = series.apply(lambda x: x + "_{}".format(count)) + series = series.apply(lambda x: x + "-{}".format(count)) mask = series.duplicated(keep='first') if np.sum(mask) > 0: # duplicates exists - series[mask] = series[mask].apply(lambda x: x.replace('_{}'.format(count), '_{}'.format(count + 1))) + series[mask] = series[mask].apply( + lambda x: x.replace('-{}'.format(count), '-{}'.format(count + 1)) + ) return mark_as_duplicate(series.copy(), count + 1) else: return series +def sort_indices(pdf, column='ssodnet'): + """ Isolate decimal values in `column`, and sort the DataFrame by ascending order + + Parameters + ---------- + pdf: pd.DataFrame + Pandas DataFrame + column: str + Column name + + Returns + ---------- + pdf: pd.DataFrame + Sorted DataFrame + """ + is_decimal = pdf[column].apply(lambda x: x.isdecimal()) + + pdf1 = pdf[is_decimal] + pdf1[column] = pdf1[column].astype(int) + pdf1 = pdf1.sort_values(column, ascending=True) + pdf1[column] = pdf1[column].astype(str) + + pdf2 = pdf[~is_decimal] + + pdf = pd.concat((pdf1, pdf2)) + + return pdf + def main(): parser = argparse.ArgumentParser(description=__doc__) @@ -114,12 +144,12 @@ def main(): sso_name, sso_number = rockify(pdf.ssnamenr.copy()) # fill None with values from original ssnamenr - mask1 = np.equal(sso_name, None) - sso_name[mask1] = pdf.ssnamenr[mask1] + has_no_name = np.equal(sso_name, None) + sso_name[has_no_name] = pdf.ssnamenr[has_no_name] # Keep only valid number - mask2 = np.equal(sso_number, sso_number) - sso_number_valid = sso_number[mask2] + is_valid_number = np.equal(sso_number, sso_number) + sso_number_valid = sso_number[is_valid_number] # Vector contains (MPC_names, MPC_numbers, ZTF_ssnamenr) index_ssodnet = np.concatenate((sso_name, sso_number_valid, pdf.ssnamenr.values)) @@ -134,7 +164,15 @@ def main(): ) # create index vector for Fink - index_fink = np.concatenate((pdf.ssnamenr.values, pdf.ssnamenr[mask2].values, pdf.ssnamenr.values)) + index_fink = np.concatenate( + ( + pdf.ssnamenr.values, + pdf.ssnamenr[is_valid_number].values, + pdf.ssnamenr.values + ) + ) + index_name = np.concatenate((sso_name, sso_name[is_valid_number], sso_name)) + index_number = np.concatenate((sso_number, sso_number_valid, sso_number)) msg = """ Number of (unique) SSO objects in Fink: {:,} @@ -149,8 +187,8 @@ def main(): len(pdf), len(sso_number_valid), len(sso_number) - len(sso_number_valid), - np.sum(~mask1 * ~mask2), - np.sum(mask1) + np.sum(~has_no_name * ~is_valid_number), + np.sum(has_no_name) ) logger.info(msg) @@ -159,11 +197,14 @@ def main(): { 'ssodnet': index_ssodnet, 'ssnamenr': index_fink, - 'source': index_source + 'name': index_name, + 'number': index_number }, dtype=str ) + pdf_index = sort_indices(pdf_index) + index_col = mark_as_duplicate(pdf_index['ssodnet'].copy()) pdf_index['ssodnet'] = index_col From f690a073b643253804a0b09dfcc97832a579fd38 Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Sat, 23 Dec 2023 21:20:31 +0100 Subject: [PATCH 2/5] Make the rowkey lower case --- bin/index_sso_resolver.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bin/index_sso_resolver.py b/bin/index_sso_resolver.py index bc5d61c1..fd4220a3 100644 --- a/bin/index_sso_resolver.py +++ b/bin/index_sso_resolver.py @@ -208,6 +208,9 @@ def main(): index_col = mark_as_duplicate(pdf_index['ssodnet'].copy()) pdf_index['ssodnet'] = index_col + # Make the ssodnet lower case (case-insensitive) + pdf_index['ssodnet'] = pdf_index['ssodnet'].apply(lambda x: x.lower()) + df_index = spark.createDataFrame(pdf_index) cf = {i: 'i' for i in df_index.columns} From e879fc32da81ad21479a04bda1ebc360b945cd1a Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Sat, 23 Dec 2023 21:27:51 +0100 Subject: [PATCH 3/5] Make the TNS resolver table rowkey lower case --- bin/download_tns.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bin/download_tns.py b/bin/download_tns.py index 088a9d69..df801f66 100644 --- a/bin/download_tns.py +++ b/bin/download_tns.py @@ -13,6 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pyspark.sql.functions as F + import pandas as pd import argparse import os @@ -24,7 +26,7 @@ def format_tns_for_hbase(pdf: pd.DataFrame) -> pd.DataFrame: - """ Format the raw TNS data for HBase consumption + """ Format the raw TNS data for HBase ingestion """ # Add new or rename columns pdf['fullname'] = pdf['name_prefix'] + ' ' + pdf['name'] @@ -78,6 +80,9 @@ def main(): cols=columns ) + # make the rowkey lower case + df_index = df_index.withColumn('index_row_key_name', F.lower('index_row_key_name')) + cf = {i: 'd' for i in df_index.columns} push_to_hbase( From ff350ef32498c2b6a8ddeae90f46c5bd63980625 Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Sat, 23 Dec 2023 21:30:40 +0100 Subject: [PATCH 4/5] PEP8 --- bin/index_sso_resolver.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/bin/index_sso_resolver.py b/bin/index_sso_resolver.py index fd4220a3..4738da20 100644 --- a/bin/index_sso_resolver.py +++ b/bin/index_sso_resolver.py @@ -72,6 +72,7 @@ def mark_as_duplicate(series, count=0): else: return series + def sort_indices(pdf, column='ssodnet'): """ Isolate decimal values in `column`, and sort the DataFrame by ascending order @@ -154,15 +155,6 @@ def main(): # Vector contains (MPC_names, MPC_numbers, ZTF_ssnamenr) index_ssodnet = np.concatenate((sso_name, sso_number_valid, pdf.ssnamenr.values)) - # Source identifier - index_source = np.concatenate( - ( - ['name'] * len(sso_name), - ['number'] * len(sso_number_valid), - ['ssnamenr'] * len(pdf) - ) - ) - # create index vector for Fink index_fink = np.concatenate( ( From 2e1e09612dc70e4090268f5c1bdc88e380869c6e Mon Sep 17 00:00:00 2001 From: JulienPeloton Date: Sun, 24 Dec 2023 13:50:43 +0100 Subject: [PATCH 5/5] Fix typo... --- bin/download_tns.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/download_tns.py b/bin/download_tns.py index df801f66..50d8d3c8 100644 --- a/bin/download_tns.py +++ b/bin/download_tns.py @@ -81,7 +81,7 @@ def main(): ) # make the rowkey lower case - df_index = df_index.withColumn('index_row_key_name', F.lower('index_row_key_name')) + df_index = df_index.withColumn(index_row_key_name, F.lower(index_row_key_name)) cf = {i: 'd' for i in df_index.columns}