From 35560275cf935c9ad850486b60b9aaeec3f9ac34 Mon Sep 17 00:00:00 2001 From: Julien Date: Tue, 10 Dec 2024 08:17:04 +0100 Subject: [PATCH] Stop pushing cutout data to HBase main table (#922) * Stop pushing cutout data to HBase main table * PEP8 * Fix number of args when calling * Fix test output --- bin/index_archival.py | 4 +-- bin/science_archival.py | 3 ++ fink_broker/hbase_utils.py | 62 ++++++++++++-------------------------- 3 files changed, 25 insertions(+), 44 deletions(-) diff --git a/bin/index_archival.py b/bin/index_archival.py index 1acb4c18..e9ace204 100644 --- a/bin/index_archival.py +++ b/bin/index_archival.py @@ -72,11 +72,11 @@ def main(): data = data.drop("year").drop("month").drop("day") # Check all columns exist, fill if necessary, and cast data - df_flat, cols_i, cols_d, cols_b = bring_to_current_schema(data) + df_flat, cols_i, cols_d = bring_to_current_schema(data) # Assign each column to a specific column family # This is independent from the final structure - cf = assign_column_family_names(df_flat, cols_i, cols_d, cols_b) + cf = assign_column_family_names(df_flat, cols_i, cols_d) # Restrict the input DataFrame to the subset of wanted columns, # except for tables containing uppervalid & upper limit data diff --git a/bin/science_archival.py b/bin/science_archival.py index dc9b47b5..cdbd6be0 100644 --- a/bin/science_archival.py +++ b/bin/science_archival.py @@ -64,6 +64,9 @@ def main(): # Drop partitioning columns df = df.drop("year").drop("month").drop("day") + # Drop images + df = df.drop("cutoutScience").drop("cutoutTemplate").drop("cutoutDifference") + # push data to HBase push_full_df_to_hbase( df, diff --git a/fink_broker/hbase_utils.py b/fink_broker/hbase_utils.py index a542dec5..82bbe318 100644 --- a/fink_broker/hbase_utils.py +++ b/fink_broker/hbase_utils.py @@ -105,10 +105,10 @@ def load_all_cols(): Examples -------- - >>> root_level, candidates, images, fink_cols, fink_nested_cols = load_all_cols() - >>> out = {**root_level, **candidates, **images, **fink_cols, **fink_nested_cols} + >>> root_level, candidates, fink_cols, fink_nested_cols = load_all_cols() + >>> out = {**root_level, **candidates, **fink_cols, **fink_nested_cols} >>> print(len(out)) - 159 + 156 """ fink_cols, fink_nested_cols = load_fink_cols() @@ -228,13 +228,7 @@ def load_all_cols(): candidates = {"candidate." + k: v for k, v in candidates.items()} - images = { - "cutoutScience.stampData": "binary", - "cutoutTemplate.stampData": "binary", - "cutoutDifference.stampData": "binary", - } - - return root_level, candidates, images, fink_cols, fink_nested_cols + return root_level, candidates, fink_cols, fink_nested_cols def bring_to_current_schema(df): @@ -250,11 +244,10 @@ def bring_to_current_schema(df): out: DataFrame Spark DataFrame with HBase data structure """ - root_level, candidates, images, fink_cols, fink_nested_cols = load_all_cols() + root_level, candidates, fink_cols, fink_nested_cols = load_all_cols() tmp_i = [] tmp_d = [] - tmp_b = [] # assuming no missing columns for colname, coltype in root_level.items(): @@ -266,13 +259,6 @@ def bring_to_current_schema(df): cols_i = df.select(tmp_i).columns - # assuming no missing columns - for colname, coltype in images.items(): - name = F.col(colname).alias(colname.replace(".", "_")).cast(coltype) - tmp_b.append(name) - - cols_b = df.select(tmp_b).columns - # check all columns exist, otherwise create it for colname, coltype_and_default in fink_cols.items(): try: @@ -315,10 +301,10 @@ def bring_to_current_schema(df): cols_d = df.select(tmp_d).columns # flatten names - cnames = tmp_i + tmp_d + tmp_b + cnames = tmp_i + tmp_d df = df.select(cnames) - return df, cols_i, cols_d, cols_b + return df, cols_i, cols_d def load_ztf_index_cols(): @@ -600,14 +586,12 @@ def select_relevant_columns( return df -def assign_column_family_names(df, cols_i, cols_d, cols_b): +def assign_column_family_names(df, cols_i, cols_d): """Assign a column family name to each column qualifier. - There are currently 3 column families: + There are currently 2 column families: - i: for column that identify the alert (original alert) - d: for column that further describe the alert (Fink added value) - - b: for binary types. It currently contains: - - binary gzipped FITS image The split is done in `bring_to_current_schema`. @@ -628,7 +612,6 @@ def assign_column_family_names(df, cols_i, cols_d, cols_b): """ cf = {i: "i" for i in df.select(["`{}`".format(k) for k in cols_i]).columns} cf.update({i: "d" for i in df.select(["`{}`".format(k) for k in cols_d]).columns}) - cf.update({i: "b" for i in df.select(["`{}`".format(k) for k in cols_b]).columns}) return cf @@ -667,9 +650,9 @@ def construct_hbase_catalog_from_flatten_schema( # Read alert from the raw database >>> df = spark.read.format("parquet").load(ztf_alert_sample_scidatabase) - >>> df_flat, cols_i, cols_d, cols_b = bring_to_current_schema(df) + >>> df_flat, cols_i, cols_d = bring_to_current_schema(df) - >>> cf = assign_column_family_names(df_flat, cols_i, cols_d, cols_b) + >>> cf = assign_column_family_names(df_flat, cols_i, cols_d) Attach the row key >>> df_rk = add_row_key(df_flat, 'objectId_jd', cols=['objectId', 'jd']) @@ -751,15 +734,15 @@ def construct_schema_row(df, rowkeyname, version): >>> df = spark.read.format("parquet").load(ztf_alert_sample_scidatabase) # inplace replacement - >>> df = df.select(['objectId', 'candidate.jd', 'candidate.candid', F.col('cutoutScience.stampData').alias('cutoutScience_stampData')]) + >>> df = df.select(['objectId', 'candidate.jd', 'candidate.candid']) >>> df = df.withColumn('schema_version', F.lit('')) >>> df = construct_schema_row(df, rowkeyname='schema_version', version='schema_v0') >>> df.show() - +--------+------+------+-----------------------+--------------+ - |objectId| jd|candid|cutoutScience_stampData|schema_version| - +--------+------+------+-----------------------+--------------+ - | string|double| long| fits/image| schema_v0| - +--------+------+------+-----------------------+--------------+ + +--------+------+------+--------------+ + |objectId| jd|candid|schema_version| + +--------+------+------+--------------+ + | string|double| long| schema_v0| + +--------+------+------+--------------+ """ # Grab the running Spark Session, @@ -769,11 +752,6 @@ def construct_schema_row(df, rowkeyname, version): # Original df columns, but values are types. data = np.array([(c.jsonValue()["type"]) for c in df.schema], dtype="