Skip to content

Commit

Permalink
Stop pushing cutout data to HBase main table (#922)
Browse files Browse the repository at this point in the history
* Stop pushing cutout data to HBase main table

* PEP8

* Fix number of args when calling

* Fix test output
  • Loading branch information
JulienPeloton authored Dec 10, 2024
1 parent a5faf86 commit 3556027
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 44 deletions.
4 changes: 2 additions & 2 deletions bin/index_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ def main():
data = data.drop("year").drop("month").drop("day")

# Check all columns exist, fill if necessary, and cast data
df_flat, cols_i, cols_d, cols_b = bring_to_current_schema(data)
df_flat, cols_i, cols_d = bring_to_current_schema(data)

# Assign each column to a specific column family
# This is independent from the final structure
cf = assign_column_family_names(df_flat, cols_i, cols_d, cols_b)
cf = assign_column_family_names(df_flat, cols_i, cols_d)

# Restrict the input DataFrame to the subset of wanted columns,
# except for tables containing uppervalid & upper limit data
Expand Down
3 changes: 3 additions & 0 deletions bin/science_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def main():
# Drop partitioning columns
df = df.drop("year").drop("month").drop("day")

# Drop images
df = df.drop("cutoutScience").drop("cutoutTemplate").drop("cutoutDifference")

# push data to HBase
push_full_df_to_hbase(
df,
Expand Down
62 changes: 20 additions & 42 deletions fink_broker/hbase_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ def load_all_cols():
Examples
--------
>>> root_level, candidates, images, fink_cols, fink_nested_cols = load_all_cols()
>>> out = {**root_level, **candidates, **images, **fink_cols, **fink_nested_cols}
>>> root_level, candidates, fink_cols, fink_nested_cols = load_all_cols()
>>> out = {**root_level, **candidates, **fink_cols, **fink_nested_cols}
>>> print(len(out))
159
156
"""
fink_cols, fink_nested_cols = load_fink_cols()

Expand Down Expand Up @@ -228,13 +228,7 @@ def load_all_cols():

candidates = {"candidate." + k: v for k, v in candidates.items()}

images = {
"cutoutScience.stampData": "binary",
"cutoutTemplate.stampData": "binary",
"cutoutDifference.stampData": "binary",
}

return root_level, candidates, images, fink_cols, fink_nested_cols
return root_level, candidates, fink_cols, fink_nested_cols


def bring_to_current_schema(df):
Expand All @@ -250,11 +244,10 @@ def bring_to_current_schema(df):
out: DataFrame
Spark DataFrame with HBase data structure
"""
root_level, candidates, images, fink_cols, fink_nested_cols = load_all_cols()
root_level, candidates, fink_cols, fink_nested_cols = load_all_cols()

tmp_i = []
tmp_d = []
tmp_b = []

# assuming no missing columns
for colname, coltype in root_level.items():
Expand All @@ -266,13 +259,6 @@ def bring_to_current_schema(df):

cols_i = df.select(tmp_i).columns

# assuming no missing columns
for colname, coltype in images.items():
name = F.col(colname).alias(colname.replace(".", "_")).cast(coltype)
tmp_b.append(name)

cols_b = df.select(tmp_b).columns

# check all columns exist, otherwise create it
for colname, coltype_and_default in fink_cols.items():
try:
Expand Down Expand Up @@ -315,10 +301,10 @@ def bring_to_current_schema(df):
cols_d = df.select(tmp_d).columns

# flatten names
cnames = tmp_i + tmp_d + tmp_b
cnames = tmp_i + tmp_d
df = df.select(cnames)

return df, cols_i, cols_d, cols_b
return df, cols_i, cols_d


def load_ztf_index_cols():
Expand Down Expand Up @@ -600,14 +586,12 @@ def select_relevant_columns(
return df


def assign_column_family_names(df, cols_i, cols_d, cols_b):
def assign_column_family_names(df, cols_i, cols_d):
"""Assign a column family name to each column qualifier.
There are currently 3 column families:
There are currently 2 column families:
- i: for column that identify the alert (original alert)
- d: for column that further describe the alert (Fink added value)
- b: for binary types. It currently contains:
- binary gzipped FITS image
The split is done in `bring_to_current_schema`.
Expand All @@ -628,7 +612,6 @@ def assign_column_family_names(df, cols_i, cols_d, cols_b):
"""
cf = {i: "i" for i in df.select(["`{}`".format(k) for k in cols_i]).columns}
cf.update({i: "d" for i in df.select(["`{}`".format(k) for k in cols_d]).columns})
cf.update({i: "b" for i in df.select(["`{}`".format(k) for k in cols_b]).columns})

return cf

Expand Down Expand Up @@ -667,9 +650,9 @@ def construct_hbase_catalog_from_flatten_schema(
# Read alert from the raw database
>>> df = spark.read.format("parquet").load(ztf_alert_sample_scidatabase)
>>> df_flat, cols_i, cols_d, cols_b = bring_to_current_schema(df)
>>> df_flat, cols_i, cols_d = bring_to_current_schema(df)
>>> cf = assign_column_family_names(df_flat, cols_i, cols_d, cols_b)
>>> cf = assign_column_family_names(df_flat, cols_i, cols_d)
Attach the row key
>>> df_rk = add_row_key(df_flat, 'objectId_jd', cols=['objectId', 'jd'])
Expand Down Expand Up @@ -751,15 +734,15 @@ def construct_schema_row(df, rowkeyname, version):
>>> df = spark.read.format("parquet").load(ztf_alert_sample_scidatabase)
# inplace replacement
>>> df = df.select(['objectId', 'candidate.jd', 'candidate.candid', F.col('cutoutScience.stampData').alias('cutoutScience_stampData')])
>>> df = df.select(['objectId', 'candidate.jd', 'candidate.candid'])
>>> df = df.withColumn('schema_version', F.lit(''))
>>> df = construct_schema_row(df, rowkeyname='schema_version', version='schema_v0')
>>> df.show()
+--------+------+------+-----------------------+--------------+
|objectId| jd|candid|cutoutScience_stampData|schema_version|
+--------+------+------+-----------------------+--------------+
| string|double| long| fits/image| schema_v0|
+--------+------+------+-----------------------+--------------+
+--------+------+------+--------------+
|objectId| jd|candid|schema_version|
+--------+------+------+--------------+
| string|double| long| schema_v0|
+--------+------+------+--------------+
<BLANKLINE>
"""
# Grab the running Spark Session,
Expand All @@ -769,11 +752,6 @@ def construct_schema_row(df, rowkeyname, version):
# Original df columns, but values are types.
data = np.array([(c.jsonValue()["type"]) for c in df.schema], dtype="<U75")

# binary types are too vague, so assign manually a description
names = np.array([(c.jsonValue()["name"]) for c in df.schema])
mask = np.array(["cutout" in i for i in names])
data[mask] = "fits/image"

index = np.where(np.array(df.columns) == rowkeyname)[0][0]
data[index] = version

Expand Down Expand Up @@ -879,14 +857,14 @@ def push_full_df_to_hbase(df, row_key_name, table_name, catalog_name):
df_casted = cast_features(df)

# Check all columns exist, fill if necessary, and cast data
df_flat, cols_i, cols_d, cols_b = bring_to_current_schema(df_casted)
df_flat, cols_i, cols_d = bring_to_current_schema(df_casted)

# Assign each column to a specific column family
# This is independent from the final structure
cf = assign_column_family_names(df_flat, cols_i, cols_d, cols_b)
cf = assign_column_family_names(df_flat, cols_i, cols_d)

# Restrict the input DataFrame to the subset of wanted columns.
all_cols = cols_i + cols_d + cols_b
all_cols = cols_i + cols_d

df_flat = add_row_key(
df_flat, row_key_name=row_key_name, cols=row_key_name.split("_")
Expand Down

0 comments on commit 3556027

Please sign in to comment.