Skip to content

Commit

Permalink
Fixes open-metadata#13052: Datalake Nested Columns Sample Data ingest…
Browse files Browse the repository at this point in the history
  • Loading branch information
ayush-shah authored Oct 8, 2023
1 parent 21bba1c commit 08d7ee6
Show file tree
Hide file tree
Showing 17 changed files with 184 additions and 174 deletions.
1 change: 0 additions & 1 deletion ingestion/src/metadata/mixins/pandas/pandas_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ def return_ometa_dataframes_sampled(
bucket_name=table.databaseSchema.name,
file_extension=table.fileFormat,
),
is_profiler=True,
)
if data:
random.shuffle(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
"""
import traceback
from collections import defaultdict
from copy import deepcopy
from datetime import datetime, timezone
from typing import Dict, List
from typing import Dict, List, Optional

from sqlalchemy import Column

Expand All @@ -31,6 +32,7 @@
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR
from metadata.utils.datalake.datalake_utils import fetch_col_types, fetch_dataframe
from metadata.utils.logger import profiler_interface_registry_logger
from metadata.utils.sqa_like_column import SQALikeColumn
Expand Down Expand Up @@ -71,11 +73,13 @@ def __init__(
table_partition_config,
thread_count,
timeout_seconds,
**kwargs,
)

self.client = self.connection.client
self._table = self.table_entity
self.dfs = self._convert_table_to_list_of_dataframe_objects()
self.sampler = self._get_sampler()
self.complex_dataframe_sample = deepcopy(self.sampler.random_sample())

def _convert_table_to_list_of_dataframe_objects(self):
"""From a table entity, return the corresponding dataframe object
Expand All @@ -91,7 +95,6 @@ def _convert_table_to_list_of_dataframe_objects(self):
bucket_name=self.table_entity.databaseSchema.name,
file_extension=self.table_entity.fileFormat,
),
is_profiler=True,
)

if not data:
Expand Down Expand Up @@ -238,14 +241,12 @@ def compute_metrics(
):
"""Run metrics in processor worker"""
logger.debug(f"Running profiler for {table}")
sampler = self._get_sampler()
dfs = sampler.random_sample()
try:
row = None
if self.dfs:
if self.complex_dataframe_sample:
row = self._get_metric_fn[metric_type.value](
metrics,
dfs,
self.complex_dataframe_sample,
column=column,
)
except Exception as exc:
Expand All @@ -270,8 +271,7 @@ def fetch_sample_data(self, table) -> TableData:
Returns:
TableData: sample table data
"""
sampler = self._get_sampler()
return sampler.fetch_sample_data()
return self.sampler.fetch_sample_data()

def get_composed_metrics(
self, column: Column, metric: Metrics, column_results: Dict
Expand Down Expand Up @@ -307,7 +307,7 @@ def get_hybrid_metrics(
dictionary of results
"""
try:
return metric(column).df_fn(column_results, self.dfs)
return metric(column).df_fn(column_results, self.complex_dataframe_sample)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception computing metrics: {exc}")
Expand Down Expand Up @@ -346,18 +346,31 @@ def get_all_metrics(
@property
def table(self):
"""OM Table entity"""
return self._table

def get_columns(self):
if self.dfs:
df = self.dfs[0]
return [
SQALikeColumn(
column_name,
fetch_col_types(df, column_name),
return self.table_entity

def get_columns(self) -> List[Optional[SQALikeColumn]]:
"""Get SQALikeColumns for datalake to be passed for metric computation"""
sqalike_columns = []
if self.complex_dataframe_sample:
for column_name in self.complex_dataframe_sample[0].columns:
complex_col_name = None
if COMPLEX_COLUMN_SEPARATOR in column_name:
complex_col_name = ".".join(
column_name.split(COMPLEX_COLUMN_SEPARATOR)[1:]
)
if complex_col_name:
for df in self.complex_dataframe_sample:
df.rename(
columns={column_name: complex_col_name}, inplace=True
)
column_name = complex_col_name or column_name
sqalike_columns.append(
SQALikeColumn(
column_name,
fetch_col_types(self.complex_dataframe_sample[0], column_name),
)
)
for column_name in df.columns
]
return sqalike_columns
return []

def close(self):
Expand Down
15 changes: 12 additions & 3 deletions ingestion/src/metadata/profiler/metrics/static/distinct_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.orm.functions.count import CountFn
from metadata.profiler.orm.registry import is_quantifiable
from metadata.utils.logger import profiler_logger

logger = profiler_logger()
Expand Down Expand Up @@ -50,17 +51,25 @@ def df_fn(self, dfs=None):
"""
Distinct Count metric for Datalake
"""
from collections import Counter # pylint: disable=import-outside-toplevel
# pylint: disable=import-outside-toplevel
from collections import Counter

import pandas as pd

try:
counter = Counter()
for df in dfs:
df_col_value = df[self.col.name].dropna().to_list()
df_col = df[self.col.name].dropna()
df_col_value = (
pd.to_numeric(df_col).to_list()
if is_quantifiable(self.col.type)
else df_col.to_list()
)
counter.update(df_col_value)
return len(counter.keys())
except Exception as err:
logger.debug(
f"Don't know how to process type {self.col.type}"
f"when computing Distinct Count.\n Error: {err}"
f" when computing Distinct Count.\n Error: {err}"
)
return 0
7 changes: 5 additions & 2 deletions ingestion/src/metadata/profiler/metrics/static/stddev.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,13 @@ def df_fn(self, dfs=None):

if is_quantifiable(self.col.type):
try:
return pd.concat(df[self.col.name] for df in dfs).std()
merged_df = pd.to_numeric(pd.concat(df[self.col.name] for df in dfs))
if len(merged_df) > 1:
return merged_df.std()
return 0
except MemoryError:
logger.error(
f"Unable to compute distinctCount for {self.col.name} due to memory constraints."
f"Unable to compute Standard Deviation for {self.col.name} due to memory constraints."
f"We recommend using a smaller sample size or partitionning."
)
return None
Expand Down
5 changes: 4 additions & 1 deletion ingestion/src/metadata/profiler/metrics/static/sum.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def fn(self):

def df_fn(self, dfs=None):
"""pandas function"""
# pylint: disable=import-outside-toplevel
import pandas as pd

if is_quantifiable(self.col.type):
return sum(df[self.col.name].sum() for df in dfs)
return sum(pd.to_numeric(df[self.col.name]).sum() for df in dfs)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ def df_fn(self, dfs=None):
except Exception as err:
logger.debug(
f"Don't know how to process type {self.col.type}"
f"when computing Unique Count.\n Error: {err}"
f" when computing Unique Count.\n Error: {err}"
)
return 0
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ def df_fn(self, dfs=None):
# the entire set. Median of Medians could be used
# though it would required set to be sorted before hand
try:
df = pd.concat(dfs)
df = pd.to_numeric(pd.concat([df[self.col.name] for df in dfs]))
except MemoryError:
logger.error(
f"Unable to compute Median for {self.col.name} due to memory constraints."
f"We recommend using a smaller sample size or partitionning."
)
return None
# check if nan
first_quartile = df[self.col.name].quantile(0.25, interpolation="midpoint")
first_quartile = df.quantile(0.25, interpolation="midpoint")
return None if pd.isnull(first_quartile) else first_quartile
logger.debug(
f"Don't know how to process type {self.col.type} when computing First Quartile"
Expand Down
4 changes: 2 additions & 2 deletions ingestion/src/metadata/profiler/metrics/window/median.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ def df_fn(self, dfs=None):
# the entire set. Median of Medians could be used
# though it would required set to be sorted before hand
try:
df = pd.concat(dfs)
df = pd.to_numeric(pd.concat([df[self.col.name] for df in dfs]))
except MemoryError:
logger.error(
f"Unable to compute Median for {self.col.name} due to memory constraints."
f"We recommend using a smaller sample size or partitionning."
)
return None
median = df[self.col.name].median()
median = df.median()
return None if pd.isnull(median) else median
logger.debug(
f"Don't know how to process type {self.col.type} when computing Median"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ def df_fn(self, dfs=None):
# the entire set. Median of Medians could be used
# though it would required set to be sorted before hand
try:
df = pd.concat(dfs)
df = pd.to_numeric(pd.concat([df[self.col.name] for df in dfs]))
except MemoryError:
logger.error(
f"Unable to compute Median for {self.col.name} due to memory constraints."
f"We recommend using a smaller sample size or partitionning."
)
return None
# check if nan
third_quartile = df[self.col.name].quantile(0.75, interpolation="midpoint")
third_quartile = df.quantile(0.75, interpolation="midpoint")
return None if pd.isnull(third_quartile) else third_quartile
logger.debug(
f"Don't know how to process type {self.col.type} when computing Third Quartile"
Expand Down
81 changes: 64 additions & 17 deletions ingestion/src/metadata/profiler/processor/sampler/pandas/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Helper module to handle data sampling
for the profiler
"""
import json
import math
import random
from typing import cast
Expand All @@ -25,7 +26,9 @@
ProfileSampleType,
TableData,
)
from metadata.ingestion.source.database.datalake.columns import _get_root_col
from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR


class DatalakeSampler(SamplerInterface):
Expand All @@ -34,9 +37,6 @@ class DatalakeSampler(SamplerInterface):
run the query in the whole table.
"""

def _fetch_rows(self, data_frame):
return data_frame.dropna().values.tolist()

def _partitioned_table(self):
"""Get partitioned table"""
self._partition_details = cast(PartitionProfilerConfig, self._partition_details)
Expand Down Expand Up @@ -121,20 +121,6 @@ def _get_sampled_dataframe(self):
for df in self.table
]

def get_col_row(self, data_frame):
"""
Fetches columns and rows from the data_frame
"""
cols = []
rows = []
cols = data_frame[0].columns.tolist()
# Sample Data should not exceed sample limit
for chunk in data_frame:
rows.extend(self._fetch_rows(chunk)[: self.sample_limit])
if len(rows) >= self.sample_limit:
break
return cols, rows

def random_sample(self):
"""Generate random sample from the table
Expand All @@ -152,6 +138,67 @@ def random_sample(self):

return self._get_sampled_dataframe()

def get_col_row(self, data_frame):
"""
Fetches columns and rows from the data_frame
"""
result_rows = []

for chunk in data_frame:
row_df = self._fetch_rows_df(chunk)
result_rows.extend(row_df.values.tolist()[: self.sample_limit])
if len(result_rows) >= self.sample_limit:
break
cols = row_df.columns.tolist()
return cols, result_rows

@staticmethod
def unflatten_dict(flat_dict):
unflattened_dict = {}
for key, value in flat_dict.items():
keys = key.split(".")
current_dict = unflattened_dict

for key in keys[:-1]:
current_dict = current_dict.setdefault(key, {})

current_dict[keys[-1]] = value

return unflattened_dict

def _fetch_rows_df(self, data_frame):
# pylint: disable=import-outside-toplevel
import numpy as np
import pandas as pd

complex_columns = list(
set(
_get_root_col(col)
for col in data_frame.columns
if COMPLEX_COLUMN_SEPARATOR in col
)
)
for complex_col in complex_columns or []:
for df_col in data_frame.columns:
if complex_col in df_col:
complex_col_name = ".".join(
df_col.split(COMPLEX_COLUMN_SEPARATOR)[1:]
)
if complex_col_name:
data_frame.rename(
columns={df_col: complex_col_name},
inplace=True,
)
return pd.json_normalize(
[
self.unflatten_dict(json.loads(row_values))
for row_values in data_frame.apply(
lambda row: row.to_json(), axis=1
).values
],
max_level=0,
).replace(np.nan, None)

def fetch_sample_data(self) -> TableData:
"""Fetch sample data from the table
Expand Down
8 changes: 1 addition & 7 deletions ingestion/src/metadata/readers/dataframe/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,7 @@ class JSONDataFrameReader(DataFrameReader):

@staticmethod
def read_from_json(
key: str,
json_text: bytes,
decode: bool = False,
is_profiler: bool = False,
**__
key: str, json_text: bytes, decode: bool = False, **__
) -> List["DataFrame"]:
"""
Decompress a JSON file (if needed) and read its contents
Expand All @@ -69,8 +65,6 @@ def read_from_json(
logger.debug("Failed to read as JSON object. Trying to read as JSON Lines")
data = [json.loads(json_obj) for json_obj in json_text.strip().split("\n")]

if is_profiler:
return dataframe_to_chunks(json_normalize(data))
return dataframe_to_chunks(json_normalize(data, sep=COMPLEX_COLUMN_SEPARATOR))

def _read(self, *, key: str, bucket_name: str, **kwargs) -> DatalakeColumnWrapper:
Expand Down
Loading

0 comments on commit 08d7ee6

Please sign in to comment.