diff --git a/bin/download_tns.py b/bin/download_tns.py index 088a9d69..50d8d3c8 100644 --- a/bin/download_tns.py +++ b/bin/download_tns.py @@ -13,6 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import pyspark.sql.functions as F + import pandas as pd import argparse import os @@ -24,7 +26,7 @@ def format_tns_for_hbase(pdf: pd.DataFrame) -> pd.DataFrame: - """ Format the raw TNS data for HBase consumption + """ Format the raw TNS data for HBase ingestion """ # Add new or rename columns pdf['fullname'] = pdf['name_prefix'] + ' ' + pdf['name'] @@ -78,6 +80,9 @@ def main(): cols=columns ) + # make the rowkey lower case + df_index = df_index.withColumn(index_row_key_name, F.lower(index_row_key_name)) + cf = {i: 'd' for i in df_index.columns} push_to_hbase( diff --git a/bin/index_sso_resolver.py b/bin/index_sso_resolver.py index d0c18860..4738da20 100644 --- a/bin/index_sso_resolver.py +++ b/bin/index_sso_resolver.py @@ -60,17 +60,48 @@ def mark_as_duplicate(series, count=0): """ if count == 0: # initialise markers - series = series.apply(lambda x: x + "_{}".format(count)) + series = series.apply(lambda x: x + "-{}".format(count)) mask = series.duplicated(keep='first') if np.sum(mask) > 0: # duplicates exists - series[mask] = series[mask].apply(lambda x: x.replace('_{}'.format(count), '_{}'.format(count + 1))) + series[mask] = series[mask].apply( + lambda x: x.replace('-{}'.format(count), '-{}'.format(count + 1)) + ) return mark_as_duplicate(series.copy(), count + 1) else: return series +def sort_indices(pdf, column='ssodnet'): + """ Isolate decimal values in `column`, and sort the DataFrame by ascending order + + Parameters + ---------- + pdf: pd.DataFrame + Pandas DataFrame + column: str + Column name + + Returns + ---------- + pdf: pd.DataFrame + Sorted DataFrame + """ + is_decimal = pdf[column].apply(lambda x: x.isdecimal()) + + pdf1 = pdf[is_decimal] + pdf1[column] = pdf1[column].astype(int) + pdf1 = pdf1.sort_values(column, ascending=True) + pdf1[column] = pdf1[column].astype(str) + + pdf2 = pdf[~is_decimal] + + pdf = pd.concat((pdf1, pdf2)) + + return pdf + + def main(): parser = argparse.ArgumentParser(description=__doc__) @@ -114,27 +145,26 @@ def main(): sso_name, sso_number = rockify(pdf.ssnamenr.copy()) # fill None with values from original ssnamenr - mask1 = np.equal(sso_name, None) - sso_name[mask1] = pdf.ssnamenr[mask1] + has_no_name = np.equal(sso_name, None) + sso_name[has_no_name] = pdf.ssnamenr[has_no_name] # Keep only valid number - mask2 = np.equal(sso_number, sso_number) - sso_number_valid = sso_number[mask2] + is_valid_number = np.equal(sso_number, sso_number) + sso_number_valid = sso_number[is_valid_number] # Vector contains (MPC_names, MPC_numbers, ZTF_ssnamenr) index_ssodnet = np.concatenate((sso_name, sso_number_valid, pdf.ssnamenr.values)) - # Source identifier - index_source = np.concatenate( + # create index vector for Fink + index_fink = np.concatenate( ( - ['name'] * len(sso_name), - ['number'] * len(sso_number_valid), - ['ssnamenr'] * len(pdf) + pdf.ssnamenr.values, + pdf.ssnamenr[is_valid_number].values, + pdf.ssnamenr.values ) ) - - # create index vector for Fink - index_fink = np.concatenate((pdf.ssnamenr.values, pdf.ssnamenr[mask2].values, pdf.ssnamenr.values)) + index_name = np.concatenate((sso_name, sso_name[is_valid_number], sso_name)) + index_number = np.concatenate((sso_number, sso_number_valid, sso_number)) msg = """ Number of (unique) SSO objects in Fink: {:,} @@ -149,8 +179,8 @@ def main(): len(pdf), len(sso_number_valid), len(sso_number) - len(sso_number_valid), - np.sum(~mask1 * ~mask2), - np.sum(mask1) + np.sum(~has_no_name * ~is_valid_number), + np.sum(has_no_name) ) logger.info(msg) @@ -159,14 +189,20 @@ def main(): { 'ssodnet': index_ssodnet, 'ssnamenr': index_fink, - 'source': index_source + 'name': index_name, + 'number': index_number }, dtype=str ) + pdf_index = sort_indices(pdf_index) + index_col = mark_as_duplicate(pdf_index['ssodnet'].copy()) pdf_index['ssodnet'] = index_col + # Make the ssodnet lower case (case-insensitive) + pdf_index['ssodnet'] = pdf_index['ssodnet'].apply(lambda x: x.lower()) + df_index = spark.createDataFrame(pdf_index) cf = {i: 'i' for i in df_index.columns}