Skip to content

Commit

Permalink
[HBase] update rowkeys for resolvers (#777)
Browse files Browse the repository at this point in the history
* Change rowkey format for SSO resolver

* Make the rowkey lower case

* Make the TNS resolver table rowkey lower case

* PEP8

* Fix typo...
  • Loading branch information
JulienPeloton authored Jan 5, 2024
1 parent f2c00ca commit aaa5ab6
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 18 deletions.
7 changes: 6 additions & 1 deletion bin/download_tns.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pyspark.sql.functions as F

import pandas as pd
import argparse
import os
Expand All @@ -24,7 +26,7 @@


def format_tns_for_hbase(pdf: pd.DataFrame) -> pd.DataFrame:
""" Format the raw TNS data for HBase consumption
""" Format the raw TNS data for HBase ingestion
"""
# Add new or rename columns
pdf['fullname'] = pdf['name_prefix'] + ' ' + pdf['name']
Expand Down Expand Up @@ -78,6 +80,9 @@ def main():
cols=columns
)

# make the rowkey lower case
df_index = df_index.withColumn(index_row_key_name, F.lower(index_row_key_name))

cf = {i: 'd' for i in df_index.columns}

push_to_hbase(
Expand Down
70 changes: 53 additions & 17 deletions bin/index_sso_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,48 @@ def mark_as_duplicate(series, count=0):
"""
if count == 0:
# initialise markers
series = series.apply(lambda x: x + "_{}".format(count))
series = series.apply(lambda x: x + "-{}".format(count))

mask = series.duplicated(keep='first')
if np.sum(mask) > 0:
# duplicates exists
series[mask] = series[mask].apply(lambda x: x.replace('_{}'.format(count), '_{}'.format(count + 1)))
series[mask] = series[mask].apply(
lambda x: x.replace('-{}'.format(count), '-{}'.format(count + 1))
)
return mark_as_duplicate(series.copy(), count + 1)
else:
return series


def sort_indices(pdf, column='ssodnet'):
""" Isolate decimal values in `column`, and sort the DataFrame by ascending order
Parameters
----------
pdf: pd.DataFrame
Pandas DataFrame
column: str
Column name
Returns
----------
pdf: pd.DataFrame
Sorted DataFrame
"""
is_decimal = pdf[column].apply(lambda x: x.isdecimal())

pdf1 = pdf[is_decimal]
pdf1[column] = pdf1[column].astype(int)
pdf1 = pdf1.sort_values(column, ascending=True)
pdf1[column] = pdf1[column].astype(str)

pdf2 = pdf[~is_decimal]

pdf = pd.concat((pdf1, pdf2))

return pdf


def main():
parser = argparse.ArgumentParser(description=__doc__)

Expand Down Expand Up @@ -114,27 +145,26 @@ def main():
sso_name, sso_number = rockify(pdf.ssnamenr.copy())

# fill None with values from original ssnamenr
mask1 = np.equal(sso_name, None)
sso_name[mask1] = pdf.ssnamenr[mask1]
has_no_name = np.equal(sso_name, None)
sso_name[has_no_name] = pdf.ssnamenr[has_no_name]

# Keep only valid number
mask2 = np.equal(sso_number, sso_number)
sso_number_valid = sso_number[mask2]
is_valid_number = np.equal(sso_number, sso_number)
sso_number_valid = sso_number[is_valid_number]

# Vector contains (MPC_names, MPC_numbers, ZTF_ssnamenr)
index_ssodnet = np.concatenate((sso_name, sso_number_valid, pdf.ssnamenr.values))

# Source identifier
index_source = np.concatenate(
# create index vector for Fink
index_fink = np.concatenate(
(
['name'] * len(sso_name),
['number'] * len(sso_number_valid),
['ssnamenr'] * len(pdf)
pdf.ssnamenr.values,
pdf.ssnamenr[is_valid_number].values,
pdf.ssnamenr.values
)
)

# create index vector for Fink
index_fink = np.concatenate((pdf.ssnamenr.values, pdf.ssnamenr[mask2].values, pdf.ssnamenr.values))
index_name = np.concatenate((sso_name, sso_name[is_valid_number], sso_name))
index_number = np.concatenate((sso_number, sso_number_valid, sso_number))

msg = """
Number of (unique) SSO objects in Fink: {:,}
Expand All @@ -149,8 +179,8 @@ def main():
len(pdf),
len(sso_number_valid),
len(sso_number) - len(sso_number_valid),
np.sum(~mask1 * ~mask2),
np.sum(mask1)
np.sum(~has_no_name * ~is_valid_number),
np.sum(has_no_name)
)

logger.info(msg)
Expand All @@ -159,14 +189,20 @@ def main():
{
'ssodnet': index_ssodnet,
'ssnamenr': index_fink,
'source': index_source
'name': index_name,
'number': index_number
},
dtype=str
)

pdf_index = sort_indices(pdf_index)

index_col = mark_as_duplicate(pdf_index['ssodnet'].copy())
pdf_index['ssodnet'] = index_col

# Make the ssodnet lower case (case-insensitive)
pdf_index['ssodnet'] = pdf_index['ssodnet'].apply(lambda x: x.lower())

df_index = spark.createDataFrame(pdf_index)

cf = {i: 'i' for i in df_index.columns}
Expand Down

0 comments on commit aaa5ab6

Please sign in to comment.