From 9e2a7f4901f33bdb6ec13750d9eee622d3992890 Mon Sep 17 00:00:00 2001 From: Ethan Cartwright Date: Tue, 5 Dec 2023 22:41:43 -0500 Subject: [PATCH] WIP exclusion addition --- .../src/datahub/configuration/common.py | 425 +++++++----------- .../ingestion/glossary/datahub_classifier.py | 42 +- test_classification.yml | 33 ++ 3 files changed, 241 insertions(+), 259 deletions(-) create mode 100644 test_classification.yml diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index f225856ca43ce4..d07707ff2f40a5 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -1,304 +1,215 @@ +from enum import Enum +from typing import Any, Dict, List, Optional import re -import unittest.mock -from abc import ABC, abstractmethod -from enum import auto -from typing import IO, Any, ClassVar, Dict, List, Optional, Type, TypeVar, Union - -import pydantic -from cached_property import cached_property -from pydantic import BaseModel, Extra, ValidationError +import logging +from datahub_classify.helper_classes import ColumnInfo +from datahub_classify.infotype_predictor import predict_infotypes +from datahub_classify.reference_input import input1 as default_config +from pydantic import validator from pydantic.fields import Field -from typing_extensions import Protocol, runtime_checkable -from datahub.configuration._config_enum import ConfigEnum -from datahub.configuration.pydantic_migration_helpers import PYDANTIC_VERSION_2 -from datahub.utilities.dedup_list import deduplicate_list +from datahub.configuration.common import ConfigModel, PermissiveConfigModel +from datahub.ingestion.glossary.classifier import Classifier -_ConfigSelf = TypeVar("_ConfigSelf", bound="ConfigModel") +logger: logging.Logger = logging.getLogger(__name__) -REDACT_KEYS = { - "password", - "token", - "secret", - "options", - "sqlalchemy_uri", -} -REDACT_SUFFIXES = { - "_password", - "_secret", - "_token", - "_key", - "_key_id", -} - -def _should_redact_key(key: Union[str, int]) -> bool: - return isinstance(key, str) and ( - key in REDACT_KEYS or any(key.endswith(suffix) for suffix in REDACT_SUFFIXES) +class NameFactorConfig(ConfigModel): + regex: List[str] = Field( + default=[".*"], + description="List of regex patterns the column name follows for the info type", ) -def _redact_value(value: Any) -> Any: - if isinstance(value, str): - # If it's just a variable reference, it's ok to show as-is. - if value.startswith("$"): - return value - return "********" - elif value is None: - return None - elif isinstance(value, bool): - # We don't have any sensitive boolean fields. - return value - elif isinstance(value, list) and not value: - # Empty states are fine. - return [] - elif isinstance(value, dict) and not value: - return {} - else: - return "********" - - -def redact_raw_config(obj: Any) -> Any: - if isinstance(obj, dict): - return { - k: _redact_value(v) if _should_redact_key(k) else redact_raw_config(v) - for k, v in obj.items() - } - elif isinstance(obj, list): - return [redact_raw_config(v) for v in obj] - else: - return obj - - -class ConfigModel(BaseModel): - class Config: - @staticmethod - def _schema_extra(schema: Dict[str, Any], model: Type["ConfigModel"]) -> None: - # We use the custom "hidden_from_docs" attribute to hide fields from the - # autogenerated docs. - remove_fields = [] - for key, prop in schema.get("properties", {}).items(): - if prop.get("hidden_from_docs"): - remove_fields.append(key) - - for key in remove_fields: - del schema["properties"][key] - - # This is purely to suppress pydantic's warnings, since this class is used everywhere. - if PYDANTIC_VERSION_2: - extra = "forbid" - ignored_types = (cached_property,) - json_schema_extra = _schema_extra - else: - extra = Extra.forbid - underscore_attrs_are_private = True - keep_untouched = ( - cached_property, - ) # needed to allow cached_property to work. See https://github.com/samuelcolvin/pydantic/issues/1241 for more info. - schema_extra = _schema_extra - - @classmethod - def parse_obj_allow_extras(cls: Type[_ConfigSelf], obj: Any) -> _ConfigSelf: - with unittest.mock.patch.object(cls.Config, "extra", pydantic.Extra.allow): - return cls.parse_obj(obj) - - -class PermissiveConfigModel(ConfigModel): - # A permissive config model that allows extra fields. - # This is useful for cases where we want to strongly type certain fields, - # but still allow the user to pass in arbitrary fields that we don't care about. - # It is usually used for argument bags that are passed through to third-party libraries. +class ExclusionNameConfig(ConfigModel): + regex: List[str] = Field( + default=[".*"], + description="List of regex patterns the column name follows for the info type", + ) - class Config: - if PYDANTIC_VERSION_2: - extra = "allow" - else: - extra = Extra.allow +class DescriptionFactorConfig(ConfigModel): + regex: List[str] = Field( + default=[".*"], + description="List of regex patterns the column description follows for the info type", + ) -class TransformerSemantics(ConfigEnum): - """Describes semantics for aspect changes""" - OVERWRITE = auto() # Apply changes blindly - PATCH = auto() # Only apply differences from what exists already on the server +class DataTypeFactorConfig(ConfigModel): + type: List[str] = Field( + default=[".*"], + description="List of data types for the info type", + ) -class TransformerSemanticsConfigModel(ConfigModel): - semantics: TransformerSemantics = TransformerSemantics.OVERWRITE - replace_existing: bool = False +class ValuePredictionType(str, Enum): + REGEX = "regex" + LIBRARY = "library" -class DynamicTypedConfig(ConfigModel): - type: str = Field( - description="The type of the dynamic object", - ) - # This config type is declared Optional[Any] here. The eventual parser for the - # specified type is responsible for further validation. - config: Optional[Any] = Field( +class ValuesFactorConfig(ConfigModel): + prediction_type: ValuePredictionType + regex: Optional[List[str]] = Field( default=None, - description="The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).", + description="List of regex patterns the column value follows for the info type", + ) + library: Optional[List[str]] = Field( + default=None, description="Library used for prediction" ) -# TODO: Many of these exception types are fairly specialized and shouldn't live in a common module. - - -class MetaError(Exception): - """A base class for all meta exceptions.""" - - -class PipelineExecutionError(MetaError): - """An error occurred when executing the pipeline.""" - - -class GraphError(MetaError): - """An error in communicating with the DataHub Graph.""" - - -class OperationalError(GraphError): - """A GraphError with extra debug annotations.""" - - message: str - info: dict - - def __init__(self, message: str, info: Optional[dict] = None): - self.message = message - self.info = info or {} - +class PredictionFactorsAndWeights(ConfigModel): + class Config: + allow_population_by_field_name = True -class ConfigurationError(MetaError): - """A configuration error.""" + Name: float = Field(alias="name") + Description: float = Field(alias="description") + Datatype: float = Field(alias="datatype") + Values: float = Field(alias="values") -class IgnorableError(MetaError): - """An error that can be ignored.""" +class ExclusionConfig(ConfigModel): + class Config: + allow_population_by_field_name = True + Name: Optional[ExclusionNameConfig] = Field(default=None, alias="name") -@runtime_checkable -class ExceptionWithProps(Protocol): - def get_telemetry_props(self) -> Dict[str, Any]: - ... + Description: Optional[DescriptionFactorConfig] = Field( + default=None, alias="description" + ) + Datatype: Optional[DataTypeFactorConfig] = Field(default=None, alias="datatype") -def should_show_stack_trace(exc: Exception) -> bool: - # Unless the exception is a ValidationError or explicitly opts out of stack traces, - # we should show the stack trace. + Values: Optional[ValuesFactorConfig] = Field(default=None, alias="values") - if isinstance(exc, ValidationError) or isinstance(exc.__cause__, ValidationError): - return False - return getattr(exc, "SHOW_STACK_TRACE", True) +class InfoTypeConfig(ConfigModel): + class Config: + allow_population_by_field_name = True + Prediction_Factors_and_Weights: PredictionFactorsAndWeights = Field( + description="Factors and their weights to consider when predicting info types", + alias="prediction_factors_and_weights", + ) + Name: Optional[NameFactorConfig] = Field(default=None, alias="name") -class ConfigurationWarning(Warning): - """A configuration warning.""" + Description: Optional[DescriptionFactorConfig] = Field( + default=None, alias="description" + ) + Datatype: Optional[DataTypeFactorConfig] = Field(default=None, alias="datatype") -class ConfigurationMechanism(ABC): - @abstractmethod - def load_config(self, config_fp: IO) -> dict: - pass + Values: Optional[ValuesFactorConfig] = Field(default=None, alias="values") -class AllowDenyPattern(ConfigModel): - """A class to store allow deny regexes""" +DEFAULT_CLASSIFIER_CONFIG = { + k: InfoTypeConfig.parse_obj(v) for k, v in default_config.items() +} - # This regex is used to check if a given rule is a regex expression or a literal. - # Note that this is not a perfect check. For example, the '.' character should - # be considered a regex special character, but it's used frequently in literal - # patterns and hence we allow it anyway. - IS_SIMPLE_REGEX: ClassVar = re.compile(r"^[A-Za-z0-9 _.-]+$") - allow: List[str] = Field( - default=[".*"], - description="List of regex patterns to include in ingestion", +# TODO: Generate Classification doc (classification.md) from python source. +class DataHubClassifierConfig(ConfigModel): + confidence_level_threshold: float = Field( + default=0.68, + description="The confidence threshold above which the prediction is considered as a proposal", ) - deny: List[str] = Field( - default=[], - description="List of regex patterns to exclude from ingestion.", + info_types: Optional[List[str]] = Field( + default=None, + description="List of infotypes to be predicted. By default, all supported infotypes are considered, along with any custom infotypes configured in `info_types_config`.", ) - ignoreCase: Optional[bool] = Field( - default=True, - description="Whether to ignore case sensitivity during pattern matching.", - ) # Name comparisons should default to ignoring case - - @property - def regex_flags(self) -> int: - return re.IGNORECASE if self.ignoreCase else 0 - - @classmethod - def allow_all(cls) -> "AllowDenyPattern": - return AllowDenyPattern() - - def allowed(self, string: str) -> bool: - for deny_pattern in self.deny: - if re.match(deny_pattern, string, self.regex_flags): - return False - - return any( - re.match(allow_pattern, string, self.regex_flags) - for allow_pattern in self.allow - ) - - def is_fully_specified_allow_list(self) -> bool: - """ - If the allow patterns are literals and not full regexes, then it is considered - fully specified. This is useful if you want to convert a 'list + filter' - pattern into a 'search for the ones that are allowed' pattern, which can be - much more efficient in some cases. - """ - return all( - self.IS_SIMPLE_REGEX.match(allow_pattern) for allow_pattern in self.allow - ) - - def get_allowed_list(self) -> List[str]: - """Return the list of allowed strings as a list, after taking into account deny patterns, if possible""" - assert self.is_fully_specified_allow_list() - return [a for a in self.allow if self.allowed(a)] - - def __eq__(self, other): # type: ignore - return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ - - -class KeyValuePattern(ConfigModel): - """ - The key-value pattern is used to map a regex pattern to a set of values. - For example, you can use it to map a table name to a list of tags to apply to it. - """ - - rules: Dict[str, List[str]] = {".*": []} - first_match_only: bool = Field( - default=True, - description="Whether to stop after the first match. If false, all matching rules will be applied.", + info_types_config: Dict[str, InfoTypeConfig] = Field( + default=DEFAULT_CLASSIFIER_CONFIG, + description="Configuration details for infotypes. See [reference_input.py](https://github.com/acryldata/datahub-classify/blob/main/datahub-classify/src/datahub_classify/reference_input.py) for default configuration.", + ) + exclusion_config: Dict[str, ExclusionConfig] = Field( + default=None, + description="Configuration details for infotypes. See [reference_input.py](https://github.com/acryldata/datahub-classify/blob/main/datahub-classify/src/datahub_classify/reference_input.py) for default configuration.", + ) + minimum_values_threshold: int = Field( + default=50, + description="Minimum number of non-null column values required to process `values` prediction factor.", ) + @validator("info_types_config") + def input_config_selectively_overrides_default_config(cls, info_types_config): + for infotype, infotype_config in DEFAULT_CLASSIFIER_CONFIG.items(): + if infotype not in info_types_config: + # if config for some info type is not provided by user, use default config for that info type. + info_types_config[infotype] = infotype_config + else: + # if config for info type is provided by user but config for its prediction factor is missing, + # use default config for that prediction factor. + for factor, weight in ( + info_types_config[infotype] + .Prediction_Factors_and_Weights.dict() + .items() + ): + if ( + weight > 0 + and getattr(info_types_config[infotype], factor) is None + ): + setattr( + info_types_config[infotype], + factor, + getattr(infotype_config, factor), + ) + # Custom info type + custom_infotypes = info_types_config.keys() - DEFAULT_CLASSIFIER_CONFIG.keys() + + for custom_infotype in custom_infotypes: + custom_infotype_config = info_types_config[custom_infotype] + # for custom infotype, config for every prediction factor must be specified. + for ( + factor, + weight, + ) in custom_infotype_config.Prediction_Factors_and_Weights.dict().items(): + if weight > 0: + assert ( + getattr(custom_infotype_config, factor) is not None + ), f"Missing Configuration for Prediction Factor {factor} for Custom Info Type {custom_infotype}" + + # Custom infotype supports only regex based prediction for column values + if custom_infotype_config.Prediction_Factors_and_Weights.Values > 0: + assert custom_infotype_config.Values + assert ( + custom_infotype_config.Values.prediction_type + == ValuePredictionType.REGEX + ), f"Invalid Prediction Type for Values for Custom Info Type {custom_infotype}. Only `regex` is supported." + + return info_types_config + + +class DataHubClassifier(Classifier): + def __init__(self, config: DataHubClassifierConfig): + self.config = config + @classmethod - def all(cls) -> "KeyValuePattern": - return KeyValuePattern() - - def value(self, string: str) -> List[str]: - matching_keys = [key for key in self.rules.keys() if re.match(key, string)] - if not matching_keys: - return [] - elif self.first_match_only: - return self.rules[matching_keys[0]] + def create(cls, config_dict: Optional[Dict[str, Any]]) -> "DataHubClassifier": + # This could be replaced by parsing to particular class, if required + if config_dict is not None: + config = DataHubClassifierConfig.parse_obj(config_dict) else: - return deduplicate_list( - [v for key in matching_keys for v in self.rules[key]] + config = DataHubClassifierConfig() + return cls(config) + + def classify(self, columns: List[ColumnInfo]) -> List[ColumnInfo]: + columns = predict_infotypes( + column_infos=columns, + confidence_level_threshold=self.config.confidence_level_threshold, + global_config={ + k: v.dict() for k, v in self.config.info_types_config.items() + }, + infotypes=self.config.info_types, + minimum_values_threshold=self.config.minimum_values_threshold, + ) + # New Exclusion Logic + excluded_columns = set() + for pattern in self.config.exclusion_config.get("name", []): + excluded_columns.update( + col.name for col in columns if re.match(pattern, col.name) ) + # Filter out excluded columns + columns = [col for col in columns if col.name not in excluded_columns] -class VersionedConfig(ConfigModel): - version: str = "1" - - -class LineageConfig(ConfigModel): - incremental_lineage: bool = Field( - default=False, - description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.", - ) - - sql_parser_use_external_process: bool = Field( - default=False, - description="When enabled, sql parser will run in isolated in a separate process. This can affect processing time but can protect from sql parser's mem leak.", - ) + return columns diff --git a/metadata-ingestion/src/datahub/ingestion/glossary/datahub_classifier.py b/metadata-ingestion/src/datahub/ingestion/glossary/datahub_classifier.py index 1f2b7f5689ea3c..75b073299d3f9a 100644 --- a/metadata-ingestion/src/datahub/ingestion/glossary/datahub_classifier.py +++ b/metadata-ingestion/src/datahub/ingestion/glossary/datahub_classifier.py @@ -1,15 +1,18 @@ from enum import Enum from typing import Any, Dict, List, Optional - +import re +import logging from datahub_classify.helper_classes import ColumnInfo from datahub_classify.infotype_predictor import predict_infotypes from datahub_classify.reference_input import input1 as default_config from pydantic import validator from pydantic.fields import Field -from datahub.configuration.common import ConfigModel +from datahub.configuration.common import ConfigModel, PermissiveConfigModel from datahub.ingestion.glossary.classifier import Classifier +logger: logging.Logger = logging.getLogger(__name__) + class NameFactorConfig(ConfigModel): regex: List[str] = Field( @@ -18,6 +21,13 @@ class NameFactorConfig(ConfigModel): ) +class ExclusionNameConfig(ConfigModel): + regex: List[str] = Field( + default=[".*"], + description="List of regex patterns the column name follows for the info type", + ) + + class DescriptionFactorConfig(ConfigModel): regex: List[str] = Field( default=[".*"], @@ -58,6 +68,21 @@ class Config: Values: float = Field(alias="values") +class ExclusionConfig(ConfigModel): + class Config: + allow_population_by_field_name = True + + Name: Optional[ExclusionNameConfig] = Field(default=None, alias="name") + + Description: Optional[DescriptionFactorConfig] = Field( + default=None, alias="description" + ) + + Datatype: Optional[DataTypeFactorConfig] = Field(default=None, alias="datatype") + + Values: Optional[ValuesFactorConfig] = Field(default=None, alias="values") + + class InfoTypeConfig(ConfigModel): class Config: allow_population_by_field_name = True @@ -96,6 +121,10 @@ class DataHubClassifierConfig(ConfigModel): default=DEFAULT_CLASSIFIER_CONFIG, description="Configuration details for infotypes. See [reference_input.py](https://github.com/acryldata/datahub-classify/blob/main/datahub-classify/src/datahub_classify/reference_input.py) for default configuration.", ) + exclusion_config: Dict[str, ExclusionConfig] = Field( + default=None, + description="Configuration details for infotypes. See [reference_input.py](https://github.com/acryldata/datahub-classify/blob/main/datahub-classify/src/datahub_classify/reference_input.py) for default configuration.", + ) minimum_values_threshold: int = Field( default=50, description="Minimum number of non-null column values required to process `values` prediction factor.", @@ -173,5 +202,14 @@ def classify(self, columns: List[ColumnInfo]) -> List[ColumnInfo]: infotypes=self.config.info_types, minimum_values_threshold=self.config.minimum_values_threshold, ) + # New Exclusion Logic + excluded_columns = set() + for pattern in self.config.exclusion_config.get("names", []): + excluded_columns.update( + col.name for col in columns if re.match(pattern, col.name) + ) + + # Filter out excluded columns + columns = [col for col in columns if col.name not in excluded_columns] return columns diff --git a/test_classification.yml b/test_classification.yml new file mode 100644 index 00000000000000..f73930b26a69e7 --- /dev/null +++ b/test_classification.yml @@ -0,0 +1,33 @@ +source: + type: snowflake + config: + include_table_lineage: true + password: 'E*s7oA6mDwNA8Q' + account_id: xaa48144 + role: accountadmin + profiling: + enabled: false + include_view_lineage: true + warehouse: COMPUTE_WH + username: swaroop + database_pattern: + allow: + - DATAHUB_COMMUNITY + profile_pattern: + deny: + - ^long_tail_companions.adoption.pet_profiles$ + - ^long_tail_companions.analytics.pet_details$ + table_pattern: + allow: + - '.*MESSAGE_REPLY_PINNED_TO.*' + schema_pattern: + allow: + - SLACK + classification: + enabled: true + info_type_to_term: + Email_Address: 006399703afe918c680cea06f5faaf19 + classifiers: + - + type: datahub + config: {confidence_level_threshold: 0.7, info_types_config: {Email_Address: {prediction_factors_and_weights: {name: 0.8, description: 0, datatype: 0, values: 0}, name: {regex: [channel_id]}, description: {regex: [channel_id]}, datatype: {type: [str]}, values: {prediction_type: regex, regex: [channel_id]}}}, exclusion_config: {name: {regex: ["channel_id"]}}} \ No newline at end of file