diff --git a/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItem.tsx b/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItem.tsx index ea84b585195188..d15f30bc03211c 100644 --- a/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItem.tsx +++ b/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItem.tsx @@ -2,10 +2,10 @@ import React from 'react'; import styled from 'styled-components'; import { Collapse } from 'antd'; -import { StructuredReportItem as StructuredReportItemType } from '../../types'; import { ANTD_GRAY } from '../../../../entity/shared/constants'; import { applyOpacity } from '../../../../shared/styleUtils'; import { StructuredReportItemContext } from './StructuredReportItemContext'; +import { StructuredReportLogEntry } from '../../types'; const StyledCollapse = styled(Collapse)<{ color: string }>` background-color: ${(props) => applyOpacity(props.color, 8)}; @@ -51,7 +51,7 @@ const Message = styled.div` `; interface Props { - item: StructuredReportItemType; + item: StructuredReportLogEntry; color: string; icon?: React.ComponentType; } diff --git a/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItemContext.tsx b/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItemContext.tsx index 10f0aa8bbe28c4..abf6ce2c676bc0 100644 --- a/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItemContext.tsx +++ b/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItemContext.tsx @@ -2,8 +2,8 @@ import React from 'react'; import styled from 'styled-components'; import { Tooltip } from 'antd'; -import { StructuredReportItem as StructuredReportItemType } from '../../types'; import { ANTD_GRAY } from '../../../../entity/shared/constants'; +import { StructuredReportLogEntry } from '../../types'; const Container = styled.div` display: flex; @@ -26,7 +26,7 @@ const Item = styled.div` `; interface Props { - item: StructuredReportItemType; + item: StructuredReportLogEntry; } export function StructuredReportItemContext({ item }: Props) { diff --git a/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItemList.tsx b/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItemList.tsx index 226186c46742e4..d84a5ec7d6ddc1 100644 --- a/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItemList.tsx +++ b/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItemList.tsx @@ -1,8 +1,8 @@ import React, { useState } from 'react'; import styled from 'styled-components'; -import { StructuredReportItem as StructuredReportItemType } from '../../types'; import { StructuredReportItem } from './StructuredReportItem'; import { ShowMoreSection } from '../../../../shared/ShowMoreSection'; +import { StructuredReportLogEntry } from '../../types'; const ItemList = styled.div` display: flex; @@ -11,7 +11,7 @@ const ItemList = styled.div` `; interface Props { - items: StructuredReportItemType[]; + items: StructuredReportLogEntry[]; color: string; icon?: React.ComponentType; pageSize?: number; @@ -26,7 +26,12 @@ export function StructuredReportItemList({ items, color, icon, pageSize = 3 }: P <> {visibleItems.map((item) => ( - + ))} {totalCount > visibleCount ? ( diff --git a/datahub-web-react/src/app/ingest/source/types.ts b/datahub-web-react/src/app/ingest/source/types.ts index 1a146c1abcfede..e99377159db956 100644 --- a/datahub-web-react/src/app/ingest/source/types.ts +++ b/datahub-web-react/src/app/ingest/source/types.ts @@ -16,71 +16,16 @@ export enum StructuredReportItemLevel { INFO, } -/** - * A set of standard / well supported warnings or error types - */ -export enum StructuredReportItemType { - /** - * Unauthorized to scan a specific part of the source - database, schema, project, or etc or a specific asset. - */ - SCAN_UNAUTHORIZED, - /** - * Unauthorized to access lineage information. - */ - LINEAGE_UNAUTHORIZED, - /** - * Unauthorized to access usage information - recent queries. - */ - USAGE_UNAUTHORIZED, - /** - * Unauthorized to profile some tables. - */ - PROFILING_UNAUTHORIZED, - /** - * Failure to parse some queries to extract column or asset-level lineage. - */ - LINEAGE_QUERY_PARSING_FAILED, - /** - * Failure to parse some queries - */ - USAGE_QUERY_PARSING_FAILED, - /** - * Failure to connect to the data source due to malformed connection details - */ - CONNECTION_FAILED_COORDINATES, - /** - * Failure to connect to the data source due to bad credentials - */ - CONNECTION_FAILED_CREDENTIALS, - /** - * Failure to connect to the data source due to unavailability of 3rd party service. - */ - CONNECTION_FAILED_SERVICE_UNAVAILABLE, - /** - * Failure to connect to the data source due to a client-side timeout. - */ - CONNECTION_FAILED_SERVICE_TIMEOUT, - /** - * Failure to connect to the data source for an unknown reason. - */ - CONNECTION_FAILED_UNKNOWN, - /** - * Fallback type for unrecognized structured report lines. - */ - UNKNOWN, -} - /** * A type describing an individual warning / failure item in a structured report. * * TODO: Determine whether we need a message field to be reported! */ -export interface StructuredReportItem { +export interface StructuredReportLogEntry { level: StructuredReportItemLevel; // The "log level" - title: string; // The "well-supported" or standardized title + title?: string; // The "well-supported" or standardized title message: string; // The message to display associated with the error. context: string[]; // The context of WHERE the issue was encountered, as a string. - rawType: string; // The "raw type" string received from the ingestion backend. } /** @@ -90,106 +35,5 @@ export interface StructuredReport { infoCount: number; errorCount: number; warnCount: number; - items: StructuredReportItem[]; + items: StructuredReportLogEntry[]; } - -/** - * A mapping of the frontend standardized error types to their messages and the raw backend error types that they are mapped from. - */ -export const STRUCTURED_REPORT_ITEM_DISPLAY_DETAILS = [ - { - type: StructuredReportItemType.UNKNOWN, - title: 'An unexpected issue occurred', - }, - { - type: StructuredReportItemType.SCAN_UNAUTHORIZED, - title: 'Unauthorized to scan some assets', - message: 'The provided credential details were unauthorized to scan some assets in the data source.', - rawTypes: [], - }, - { - type: StructuredReportItemType.LINEAGE_UNAUTHORIZED, - title: 'Unauthorized to extract some lineage', - message: - 'The provided credential details were unauthorized to extract some asset lineage from the data source.', - rawTypes: [], - }, - { - type: StructuredReportItemType.USAGE_UNAUTHORIZED, - title: 'Unauthorized to extract some usage', - message: - 'The provided credential details were unauthorized to extract some asset usage information from the data source.', - rawTypes: [], - }, - { - type: StructuredReportItemType.PROFILING_UNAUTHORIZED, - title: 'Unauthorized to extract some data statistics', - message: - 'The provided credential details were unauthorized to extract some asset profiles or statistics from the data source.', - rawTypes: [], - }, - { - type: StructuredReportItemType.LINEAGE_QUERY_PARSING_FAILED, - title: 'Failed to extract some lineage', - message: 'Failed to extract lineage for some assets due to failed query parsing.', - rawTypes: [], - }, - { - type: StructuredReportItemType.USAGE_QUERY_PARSING_FAILED, - title: 'Failed to extract some usage', - message: 'Failed to extract usage or popularity for some assets due to failed query parsing.', - rawTypes: [], - }, - { - type: StructuredReportItemType.CONNECTION_FAILED_COORDINATES, - title: 'Failed to connect using provided details', - message: - 'Failed to connect to data source. Unable to establish a connection to the specified service. Please check the connection details.', - rawTypes: [], - }, - { - type: StructuredReportItemType.CONNECTION_FAILED_CREDENTIALS, - title: 'Failed to connect using provided credentials', - message: - 'Failed to connect to data source. Unable to authenticate with the specified service using the provided credentials. Please check the connection credentials.', - rawTypes: [], - }, - { - type: StructuredReportItemType.CONNECTION_FAILED_SERVICE_UNAVAILABLE, - title: 'Service unavailable', - message: 'Failed to connect to the data source. The service is currently unavailable.', - rawTypes: [], - }, - { - type: StructuredReportItemType.CONNECTION_FAILED_SERVICE_TIMEOUT, - title: 'Service timeout', - message: - 'Failed to connect to the data source. A timeout was encountered when attempting to extract data from the data source.', - rawTypes: [], - }, - { - type: StructuredReportItemType.CONNECTION_FAILED_UNKNOWN, - title: 'Unknown connection error', - message: 'Failed to connect to the data source for an unknown reason. Please check the connection details.', - rawTypes: [], - }, -]; - -/** - * Map raw type to details associated above. - */ -export const STRUCTURED_REPORT_ITEM_RAW_TYPE_TO_DETAILS = new Map(); -STRUCTURED_REPORT_ITEM_DISPLAY_DETAILS.forEach((details) => { - const rawTypes = details.rawTypes || []; - rawTypes.forEach((rawType) => { - STRUCTURED_REPORT_ITEM_RAW_TYPE_TO_DETAILS.set(rawType, details); - }); -}); - -/** - * Map std type to details associated above. - */ -export const STRUCTURED_REPORT_ITEM_TYPE_TO_DETAILS = new Map(); -STRUCTURED_REPORT_ITEM_DISPLAY_DETAILS.forEach((details) => { - STRUCTURED_REPORT_ITEM_TYPE_TO_DETAILS.set(details.type, details); -}); diff --git a/datahub-web-react/src/app/ingest/source/utils.ts b/datahub-web-react/src/app/ingest/source/utils.ts index 3b9fedb6921032..7e00522ffaf47d 100644 --- a/datahub-web-react/src/app/ingest/source/utils.ts +++ b/datahub-web-react/src/app/ingest/source/utils.ts @@ -13,14 +13,7 @@ import EntityRegistry from '../../entity/EntityRegistry'; import { ANTD_GRAY, REDESIGN_COLORS } from '../../entity/shared/constants'; import { capitalizeFirstLetterOnly, pluralize } from '../../shared/textUtil'; import { SourceConfig } from './builder/types'; -import { - STRUCTURED_REPORT_ITEM_RAW_TYPE_TO_DETAILS, - STRUCTURED_REPORT_ITEM_TYPE_TO_DETAILS, - StructuredReport, - StructuredReportItem, - StructuredReportItemLevel, - StructuredReportItemType, -} from './types'; +import { StructuredReport, StructuredReportLogEntry, StructuredReportItemLevel } from './types'; export const getSourceConfigs = (ingestionSources: SourceConfig[], sourceType: string) => { const sourceConfigs = ingestionSources.find((source) => source.name === sourceType); @@ -147,36 +140,7 @@ export const validateURL = (fieldName: string) => { }; }; -const tryMapRawTypeToStructuredTypeByName = (rawType: string): StructuredReportItemType => { - const normalizedType = rawType.toLocaleUpperCase(); - return ( - StructuredReportItemType[normalizedType as keyof typeof StructuredReportItemType] || - StructuredReportItemType.UNKNOWN - ); -}; - -const getStructuredReportItemType = (rawType: string): StructuredReportItemType => { - return STRUCTURED_REPORT_ITEM_RAW_TYPE_TO_DETAILS.has(rawType) - ? STRUCTURED_REPORT_ITEM_RAW_TYPE_TO_DETAILS.get(rawType).type - : tryMapRawTypeToStructuredTypeByName(rawType); -}; - -const getStructuredReportItemTitle = (rawType: string): string => { - const type = getStructuredReportItemType(rawType); - return STRUCTURED_REPORT_ITEM_TYPE_TO_DETAILS.get(type)?.title; -}; - -const getStructuredReportItemLevel = (rawLevel: string) => { - const normalizedLevel = rawLevel.toLocaleUpperCase(); - return StructuredReportItemLevel[normalizedLevel as keyof typeof StructuredReportItemType]; -}; - -const getStructuredReportItemMessage = (rawType: string): string => { - const stdType = getStructuredReportItemType(rawType); - return StructuredReportItemType.UNKNOWN ? rawType : STRUCTURED_REPORT_ITEM_TYPE_TO_DETAILS.get(stdType)?.message; -}; - -const createStructuredReport = (items: StructuredReportItem[]): StructuredReport => { +const createStructuredReport = (items: StructuredReportLogEntry[]): StructuredReport => { const errorCount = items.filter((item) => item.level === StructuredReportItemLevel.ERROR).length; const warnCount = items.filter((item) => item.level === StructuredReportItemLevel.WARN).length; const infoCount = items.filter((item) => item.level === StructuredReportItemLevel.INFO).length; @@ -193,61 +157,69 @@ const transformToStructuredReport = (structuredReportObj: any): StructuredReport return null; } - /* Legacy help function to map backend failure or warning ingestion objects into StructuredReportItems */ + /* Legacy helper function to map backend failure or warning ingestion objects into StructuredReportLogEntry[] */ const mapItemObject = ( items: { [key: string]: string[] }, level: StructuredReportItemLevel, - ): StructuredReportItem[] => { - return Object.entries(items).map(([rawType, context]) => ({ + ): StructuredReportLogEntry[] => { + return Object.entries(items).map(([rawMessage, context]) => ({ level, - title: getStructuredReportItemTitle(rawType), - message: getStructuredReportItemMessage(rawType), + title: 'An unexpected issue occurred', + message: rawMessage, context, - rawType, })); }; - /* V2 help function to map backend failure or warning lists into StructuredReportItems */ - const mapItemArray = (items): StructuredReportItem[] => { - return items.map((item) => ({ - level: getStructuredReportItemLevel(item.level), - title: getStructuredReportItemTitle(item.type), - message: !item.message ? getStructuredReportItemMessage(item.type) : item.message, - context: item.context, - rawType: item.type, - })); + /* V2 helper function to map backend failure or warning lists into StructuredReportLogEntry[] */ + const mapItemArray = (items, level: StructuredReportItemLevel): StructuredReportLogEntry[] => { + return items + .map((item) => { + if (typeof item === 'string') { + // Handle "sampled from" case.. + return null; + } + + return { + level, + title: item.title || 'An unexpected issue occurred', + message: item.message, + context: item.context, + }; + }) + .filter((item) => item != null); }; - const sourceReport = structuredReportObj.source?.report; - - if (!sourceReport) { - return null; - } + try { + const sourceReport = structuredReportObj.source?.report; - // extract the report. - let structuredReport: StructuredReport; + if (!sourceReport) { + return null; + } - if (sourceReport.structured_logs) { - // If the report has NEW structured logs fields, use that field. - structuredReport = createStructuredReport(mapItemArray(sourceReport.structured_logs || [])); - } else { // Else fallback to using the legacy fields - const failures = sourceReport.failure_list + const failures = Array.isArray(sourceReport.failures) ? /* Use V2 failureList if present */ - mapItemArray(sourceReport.failure_list || []) + mapItemArray(sourceReport.failures || [], StructuredReportItemLevel.ERROR) : /* Else use the legacy object type */ mapItemObject(sourceReport.failures || {}, StructuredReportItemLevel.ERROR); - const warnings = sourceReport.warning_list + const warnings = Array.isArray(sourceReport.warnings) ? /* Use V2 warning if present */ - mapItemArray(sourceReport.warning_list || []) + mapItemArray(sourceReport.warnings || [], StructuredReportItemLevel.WARN) : /* Else use the legacy object type */ mapItemObject(sourceReport.warnings || {}, StructuredReportItemLevel.WARN); - structuredReport = createStructuredReport([...failures, ...warnings]); - } + const infos = Array.isArray(sourceReport.infos) + ? /* Use V2 infos if present */ + mapItemArray(sourceReport.infos || [], StructuredReportItemLevel.INFO) + : /* Else use the legacy object type */ + mapItemObject(sourceReport.infos || {}, StructuredReportItemLevel.INFO); - return structuredReport; + return createStructuredReport([...failures, ...warnings, ...infos]); + } catch (e) { + console.warn('Failed to extract structured report from ingestion report!', e); + return null; + } }; export const getStructuredReport = (result: Partial): StructuredReport | null => { diff --git a/docs/posts.md b/docs/posts.md index c44125bbd00179..107ea9020031b2 100644 --- a/docs/posts.md +++ b/docs/posts.md @@ -13,9 +13,33 @@ DataHub allows users to make Posts that can be displayed on the app. Currently, Anyone can view Posts on the home page. To create Posts, a user must either have the **Create Global Announcements** Privilege, or possess the **Admin** DataHub Role. -## Using Posts +## Creating Posts -To create a post, users must use the [createPost](../graphql/mutations.md#createPost) GraphQL mutation. There is currently no way to create posts using the UI, though this will come in the future. +### Create Posts Using the UI +To create a post, first navigate to the Settings tab in the top-right menu of DataHub. +Once you're on the Settings page, click 'Home Page Posts'. +To create a new Post, click '+ New Post'. +

+ Creating a new post +

+DataHub currently supports two types of Post content. Posts can either be of type **Text** or **Link**. Click on "Post Type" to switch between these types. +

+ Selecting text post type +

+

+ Selecting link post type +

+If you choose the text type, enter the title and description as prompted; if you choose the link type, enter the title and the URL of the link and the address of the image as prompted. + +Click 'Create' to complete. +

+ Viewing posts +

+ +### Create Posts Using the GraphQL + +To create a post via API, you can call the [createPost](../graphql/mutations.md#createPost) GraphQL mutation. +To create a post via API, you can call the [createPost](../graphql/mutations.md#createPost) GraphQL mutation. There is only one type of Post that can be currently made, and that is a **Home Page Announcement**. This may be extended in the future to other surfaces. diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index cd4ed37d110bd2..d3df59c6468df5 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -103,7 +103,7 @@ } classification_lib = { - "acryl-datahub-classify==0.0.10", + "acryl-datahub-classify==0.0.11", # This is a bit of a hack. Because we download the SpaCy model at runtime in the classify plugin, # we need pip to be available. "pip", diff --git a/metadata-ingestion/src/datahub/cli/json_file.py b/metadata-ingestion/src/datahub/cli/json_file.py index c3d7a85d722e2d..8fb3ea46c64e81 100644 --- a/metadata-ingestion/src/datahub/cli/json_file.py +++ b/metadata-ingestion/src/datahub/cli/json_file.py @@ -9,14 +9,13 @@ def check_mce_file(filepath: str) -> str: mce_source = GenericFileSource.create({"filename": filepath}, None) for _ in mce_source.get_workunits(): pass - if mce_source.get_report().failures: + if len(mce_source.get_report().failures): # raise the first failure found logger.error( f"Event file check failed with errors. Raising first error found. Full report {mce_source.get_report().as_string()}" ) - for failure_list in mce_source.get_report().failures.values(): - if len(failure_list): - raise Exception(failure_list[0]) + for failure in mce_source.get_report().failures: + raise Exception(failure.context) raise Exception( f"Failed to process file due to {mce_source.get_report().failures}" ) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 173f7ec59b0c23..d78500b4401e5f 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -21,6 +21,7 @@ ) from pydantic import BaseModel +from typing_extensions import LiteralString from datahub.configuration.common import ConfigModel from datahub.configuration.source_common import PlatformInstanceConfigMixin @@ -62,6 +63,107 @@ class SourceCapability(Enum): CLASSIFICATION = "Classification" +class StructuredLogLevel(Enum): + INFO = logging.INFO + WARN = logging.WARN + ERROR = logging.ERROR + + +@dataclass +class StructuredLogEntry(Report): + title: Optional[str] + message: str + context: LossyList[str] + + +@dataclass +class StructuredLogs(Report): + # Underlying Lossy Dicts to Capture Errors, Warnings, and Infos. + _entries: Dict[StructuredLogLevel, LossyDict[str, StructuredLogEntry]] = field( + default_factory=lambda: { + StructuredLogLevel.ERROR: LossyDict(10), + StructuredLogLevel.WARN: LossyDict(10), + StructuredLogLevel.INFO: LossyDict(10), + } + ) + + def report_log( + self, + level: StructuredLogLevel, + message: LiteralString, + title: Optional[LiteralString] = None, + context: Optional[str] = None, + exc: Optional[BaseException] = None, + log: bool = False, + ) -> None: + """ + Report a user-facing warning for the ingestion run. + + Args: + level: The level of the log entry. + message: The main message associated with the report entry. This should be a human-readable message. + title: The category / heading to present on for this message in the UI. + context: Additional context (e.g. where, how) for the log entry. + exc: The exception associated with the event. We'll show the stack trace when in debug mode. + """ + + stacklevel = 2 + + log_key = f"{title}-{message}" + entries = self._entries[level] + + log_content = f"{message} => {context}" if context else message + if exc: + log_content += f"{log_content}: {exc}" + + if log: + logger.log(level=level.value, msg=log_content, stacklevel=stacklevel) + logger.log( + level=logging.DEBUG, + msg="Full stack trace:", + stacklevel=stacklevel, + exc_info=exc, + ) + + # Add the simple exception details to the context. + context = f"{context}: {exc}" + elif log: + logger.log(level=level.value, msg=log_content, stacklevel=stacklevel) + + if log_key not in entries: + context_list: LossyList[str] = LossyList() + if context is not None: + context_list.append(context) + entries[log_key] = StructuredLogEntry( + title=title, + message=message, + context=context_list, + ) + else: + if context is not None: + entries[log_key].context.append(context) + + def _get_of_type(self, level: StructuredLogLevel) -> LossyList[StructuredLogEntry]: + entries = self._entries[level] + result: LossyList[StructuredLogEntry] = LossyList() + for log in entries.values(): + result.append(log) + result.set_total(entries.total_key_count()) + return result + + @property + def warnings(self) -> LossyList[StructuredLogEntry]: + return self._get_of_type(StructuredLogLevel.WARN) + + @property + def failures(self) -> LossyList[StructuredLogEntry]: + return self._get_of_type(StructuredLogLevel.ERROR) + + @property + def infos(self) -> LossyList[StructuredLogEntry]: + return self._get_of_type(StructuredLogLevel.INFO) + + @dataclass class SourceReport(Report): events_produced: int = 0 @@ -76,8 +178,19 @@ class SourceReport(Report): default_factory=lambda: defaultdict(lambda: defaultdict(LossyList)) ) - warnings: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict) - failures: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict) + _structured_logs: StructuredLogs = field(default_factory=StructuredLogs) + + @property + def warnings(self) -> LossyList[StructuredLogEntry]: + return self._structured_logs.warnings + + @property + def failures(self) -> LossyList[StructuredLogEntry]: + return self._structured_logs.failures + + @property + def infos(self) -> LossyList[StructuredLogEntry]: + return self._structured_logs.infos def report_workunit(self, wu: WorkUnit) -> None: self.events_produced += 1 @@ -109,28 +222,74 @@ def report_workunit(self, wu: WorkUnit) -> None: "fineGrainedLineages" ].append(urn) - def report_warning(self, key: str, reason: str) -> None: - warnings = self.warnings.get(key, LossyList()) - warnings.append(reason) - self.warnings[key] = warnings + def report_warning( + self, + message: LiteralString, + context: Optional[str] = None, + title: Optional[LiteralString] = None, + exc: Optional[BaseException] = None, + ) -> None: + self._structured_logs.report_log( + StructuredLogLevel.WARN, message, title, context, exc, log=False + ) + + def warning( + self, + message: LiteralString, + context: Optional[str] = None, + title: Optional[LiteralString] = None, + exc: Optional[BaseException] = None, + ) -> None: + self._structured_logs.report_log( + StructuredLogLevel.WARN, message, title, context, exc, log=True + ) - def warning(self, key: str, reason: str) -> None: - self.report_warning(key, reason) - logger.warning(f"{key} => {reason}", stacklevel=2) + def report_failure( + self, + message: LiteralString, + context: Optional[str] = None, + title: Optional[LiteralString] = None, + exc: Optional[BaseException] = None, + ) -> None: + self._structured_logs.report_log( + StructuredLogLevel.ERROR, message, title, context, exc, log=False + ) - def report_failure(self, key: str, reason: str) -> None: - failures = self.failures.get(key, LossyList()) - failures.append(reason) - self.failures[key] = failures + def failure( + self, + message: LiteralString, + context: Optional[str] = None, + title: Optional[LiteralString] = None, + exc: Optional[BaseException] = None, + ) -> None: + self._structured_logs.report_log( + StructuredLogLevel.ERROR, message, title, context, exc, log=True + ) - def failure(self, key: str, reason: str) -> None: - self.report_failure(key, reason) - logger.error(f"{key} => {reason}", stacklevel=2) + def info( + self, + message: LiteralString, + context: Optional[str] = None, + title: Optional[LiteralString] = None, + exc: Optional[BaseException] = None, + ) -> None: + self._structured_logs.report_log( + StructuredLogLevel.INFO, message, title, context, exc, log=True + ) def __post_init__(self) -> None: self.start_time = datetime.datetime.now() self.running_time: datetime.timedelta = datetime.timedelta(seconds=0) + def as_obj(self) -> dict: + return { + **super().as_obj(), + # To reduce the amount of nesting, we pull these fields out of the structured log. + "failures": Report.to_pure_python_obj(self.failures), + "warnings": Report.to_pure_python_obj(self.warnings), + "infos": Report.to_pure_python_obj(self.infos), + } + def compute_stats(self) -> None: duration = datetime.datetime.now() - self.start_time workunits_produced = self.events_produced diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 4e66ee9c978f0b..e61ffa46b3c107 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -50,7 +50,7 @@ clear_global_warnings, get_global_warnings, ) -from datahub.utilities.lossy_collections import LossyDict, LossyList +from datahub.utilities.lossy_collections import LossyList logger = logging.getLogger(__name__) _REPORT_PRINT_INTERVAL_SECONDS = 60 @@ -124,7 +124,7 @@ class PipelineInitError(Exception): class PipelineStatus(enum.Enum): UNKNOWN = enum.auto() COMPLETED = enum.auto() - PIPELINE_ERROR = enum.auto() + ERROR = enum.auto() CANCELLED = enum.auto() @@ -508,16 +508,8 @@ def run(self) -> None: logger.error("Caught error", exc_info=e) raise except Exception as exc: - self.final_status = PipelineStatus.PIPELINE_ERROR - logger.exception("Ingestion pipeline threw an uncaught exception") - - # HACK: We'll report this as a source error, since we don't have a great place to put it. - # It theoretically could've come from any part of the pipeline, but usually it's from the source. - # This ensures that it is included in the report, and that the run is marked as failed. - self.source.get_report().report_failure( - "pipeline_error", - f"Ingestion pipeline threw an uncaught exception: {exc}", - ) + self.final_status = PipelineStatus.ERROR + self._handle_uncaught_pipeline_exception(exc) finally: clear_global_warnings() @@ -627,11 +619,8 @@ def log_ingestion_stats(self) -> None: self.ctx.graph, ) - def _approx_all_vals(self, d: LossyDict[str, LossyList]) -> int: - result = d.dropped_keys_count() - for k in d: - result += len(d[k]) - return result + def _approx_all_vals(self, d: LossyList[Any]) -> int: + return d.total_elements def _get_text_color(self, running: bool, failures: bool, warnings: bool) -> str: if running: @@ -657,7 +646,7 @@ def pretty_print_summary( if ( not workunits_produced and not currently_running - and self.final_status == PipelineStatus.PIPELINE_ERROR + and self.final_status == PipelineStatus.ERROR ): # If the pipeline threw an uncaught exception before doing anything, printing # out the report would just be annoying. @@ -725,6 +714,14 @@ def pretty_print_summary( ) return 0 + def _handle_uncaught_pipeline_exception(self, exc: Exception) -> None: + logger.exception("Ingestion pipeline threw an uncaught exception") + self.source.get_report().report_failure( + title="Pipeline Error", + message="Ingestion pipeline raised an unexpected exception!", + exc=exc, + ) + def _get_structured_report(self) -> Dict[str, Any]: return { "cli": self.cli_report.as_obj(), diff --git a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py index e3933b985c28ad..6cab0ffc8f25c5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py @@ -466,8 +466,9 @@ def construct_schema_metadata( if schema_size > MAX_SCHEMA_SIZE: # downsample the schema, using frequency as the sort key self.report.report_warning( - key=dataset_urn, - reason=f"Downsampling the table schema because MAX_SCHEMA_SIZE threshold is {MAX_SCHEMA_SIZE}", + title="Schema Size Too Large", + message=f"Downsampling the table schema because MAX_SCHEMA_SIZE threshold is {MAX_SCHEMA_SIZE}", + context=f"Collection: {dataset_urn}", ) # Add this information to the custom properties so user can know they are looking at down sampled schema @@ -535,7 +536,9 @@ def get_native_type(self, attribute_type: Union[type, str], table_name: str) -> ) if type_string is None: self.report.report_warning( - table_name, f"unable to map type {attribute_type} to native data type" + title="Unable to Map Attribute Type", + message=f"Unable to map type {attribute_type} to native data type", + context=f"Collection: {table_name}", ) return _attribute_type_to_native_type_mapping[attribute_type] return type_string @@ -550,8 +553,9 @@ def get_field_type( if type_class is None: self.report.report_warning( - table_name, - f"unable to map type {attribute_type} to metadata schema field type", + title="Unable to Map Field Type", + message=f"Unable to map type {attribute_type} to metadata schema field type", + context=f"Collection: {table_name}", ) type_class = NullTypeClass return SchemaFieldDataType(type=type_class()) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index 44b6fcdf6d4673..1d7956a8065582 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -578,8 +578,8 @@ def _get_tags_from_field_type( ) else: reporter.report_warning( - "lookml", - f"Failed to map view field type {field.field_type}. Won't emit tags for measure and dimension", + title="Failed to Map View Field Type", + message=f"Failed to map view field type {field.field_type}. Won't emit tags for measure and dimension", ) if schema_field_tags: @@ -835,8 +835,9 @@ def from_api( # noqa: C901 potential_views.append(view_name) except AssertionError: reporter.report_warning( - key=f"chart-field-{field_name}", - reason="The field was not prefixed by a view name. This can happen when the field references another dynamic field.", + title="Missing View Name", + message="The field was not prefixed by a view name. This can happen when the field references another dynamic field.", + context=view_name, ) continue @@ -982,8 +983,9 @@ def from_api( # noqa: C901 except AssertionError: reporter.report_warning( - key="chart-", - reason="Was unable to find dependent views for this chart", + title="Unable to find Views", + message="Encountered exception while attempting to find dependent views for this chart", + context=f"Explore: {explore_name}, Mode: {model}, Views: {views}", ) return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index 031186bb6e7a2c..72d094c2cf9423 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -284,8 +284,9 @@ def _get_views_from_fields(self, fields: List[str]) -> List[str]: views.add(view_name) except AssertionError: self.reporter.report_warning( - key=f"chart-field-{field_name}", - reason="The field was not prefixed by a view name. This can happen when the field references another dynamic field.", + title="Failed to Extract View Name from Field", + message="The field was not prefixed by a view name. This can happen when the field references another dynamic field.", + context=f"Field Name: {field_name}", ) continue @@ -593,16 +594,18 @@ def _get_chart_type( type_str = dashboard_element.type if not type_str: self.reporter.report_warning( - key=f"looker-chart-{dashboard_element.id}", - reason=f"Chart type {type_str} is missing. Setting to None", + title="Unrecognized Chart Type", + message=f"Chart type {type_str} is not recognized. Setting to None", + context=f"Dashboard Id: {dashboard_element.id}", ) return None try: chart_type = type_mapping[type_str] except KeyError: self.reporter.report_warning( - key=f"looker-chart-{dashboard_element.id}", - reason=f"Chart type {type_str} not supported. Setting to None", + title="Unrecognized Chart Type", + message=f"Chart type {type_str} is not recognized. Setting to None", + context=f"Dashboard Id: {dashboard_element.id}", ) chart_type = None @@ -1251,8 +1254,9 @@ def process_dashboard( except SDKError: # A looker dashboard could be deleted in between the list and the get self.reporter.report_warning( - dashboard_id, - f"Error occurred while loading dashboard {dashboard_id}. Skipping.", + title="Error Loading Dashboard", + message="Error occurred while attempting to loading dashboard from Looker API. Skipping.", + context=f"Dashboard ID: {dashboard_id}", ) return [], None, dashboard_id, start_time, datetime.datetime.now() @@ -1262,7 +1266,9 @@ def process_dashboard( or dashboard_object.folder.is_personal_descendant ): self.reporter.report_warning( - dashboard_id, "Dropped due to being a personal folder" + title="Dropped Dashboard", + message="Dropped due to being a personal folder", + context=f"Dashboard ID: {dashboard_id}", ) self.reporter.report_dashboards_dropped(dashboard_id) return [], None, dashboard_id, start_time, datetime.datetime.now() @@ -1540,7 +1546,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ): # Looks like we tried to extract owners and could not find their email addresses. This is likely a permissions issue self.reporter.report_warning( - "api", + "Failed to extract owner emails", "Failed to extract owners emails for any dashboards. Please enable the see_users permission for your Looker API key", ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index 0c9b3ae8695cf4..7f8ae5ead81a7a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -419,7 +419,10 @@ def from_looker_dict( explores.extend(included_explores) except Exception as e: reporter.report_warning( - path, f"Failed to load {included_file} due to {e}" + title="Error Loading Include File", + message="Failed to load included file", + context=f"Include Details: {included_file}", + exc=e, ) # continue in this case, as it might be better to load and resolve whatever we can @@ -520,10 +523,16 @@ def resolve_includes( f"traversal_path={traversal_path}, included_files = {included_files}, seen_so_far: {seen_so_far}" ) if "*" not in inc and not included_files: - reporter.report_failure(path, f"cannot resolve include {inc}") + reporter.report_failure( + title="Error Resolving Include", + message=f"Cannot resolve include {inc}", + context=f"Path: {path}", + ) elif not included_files: reporter.report_failure( - path, f"did not resolve anything for wildcard include {inc}" + title="Error Resolving Include", + message=f"Did not resolve anything for wildcard include {inc}", + context=f"Path: {path}", ) # only load files that we haven't seen so far included_files = [x for x in included_files if x not in seen_so_far] @@ -562,7 +571,10 @@ def resolve_includes( ) except Exception as e: reporter.report_warning( - path, f"Failed to load {included_file} due to {e}" + title="Error Loading Include", + message="Failed to load include file", + context=f"Include Details: {included_file}", + exc=e, ) # continue in this case, as it might be better to load and resolve whatever we can @@ -673,7 +685,11 @@ def _load_viewfile( with open(path) as file: raw_file_content = file.read() except Exception as e: - self.reporter.report_failure(path, f"failed to load view file: {e}") + self.reporter.report_failure( + message="Failed to read view file", + context=f"Path: {path}", + exc=e, + ) return None try: logger.debug(f"Loading viewfile {path}") @@ -691,7 +707,11 @@ def _load_viewfile( self.viewfile_cache[path] = looker_viewfile return looker_viewfile except Exception as e: - self.reporter.report_failure(path, f"failed to load view file: {e}") + self.reporter.report_failure( + message="Failed to parse view file", + context=f"Path: {path}", + exc=e, + ) return None def load_viewfile( @@ -1348,8 +1368,10 @@ def _extract_metadata_from_derived_table_sql( except Exception as e: reporter.query_parse_failures += 1 reporter.report_warning( - f"looker-view-{view_name}", - f"Failed to parse sql query, lineage will not be accurate. Exception: {e}", + title="Error Parsing SQL", + message="Failed to parse sql query, lineage will not be accurate.", + context=f"Table Name: {sql_table_name}, Query: {sql_query}", + exc=e, ) sql_table_names = [table for table in sql_table_names if "{%" not in table] @@ -1585,7 +1607,9 @@ def _generate_fully_qualified_name( return dataset_name.lower() self.reporter.report_warning( - key=sql_table_name, reason=f"{sql_table_name} has more than 3 parts." + title="Malformed Table Name", + message="Table name has more than 3 parts.", + context=f"Table Name: {sql_table_name}", ) return sql_table_name.lower() @@ -1653,8 +1677,9 @@ def _get_connection_def_based_on_connection_string( return connection_def except ConfigurationError: self.reporter.report_warning( - f"connection-{connection}", - "Failed to load connection from Looker", + title="Failed to Resolve Connection", + message="Failed to resolve connection from Looker", + context=f"Connection: {connection}", ) return None @@ -1931,7 +1956,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: if not self.report.events_produced and not self.report.failures: # Don't pass if we didn't produce any events. self.report.report_failure( - "
", + "No Metadata Produced", "No metadata was produced. Check the logs for more details.", ) @@ -2058,7 +2083,10 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 model = self._load_model(str(file_path)) except Exception as e: self.reporter.report_warning( - model_name, f"unable to load Looker model at {file_path}: {repr(e)}" + title="Error Loading Model File", + message="Unable to load Looker model from file.", + context=f"Model Name: {model_name}, File Path: {file_path}", + exc=e, ) continue @@ -2069,8 +2097,9 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 if connectionDefinition is None: self.reporter.report_warning( - f"model-{model_name}", - f"Failed to load connection {model.connection}. Check your API key permissions and/or connection_to_platform_map configuration.", + title="Failed to Load Connection", + message="Failed to load connection. Check your API key permissions and/or connection_to_platform_map configuration.", + context=f"Connection: {model.connection}", ) self.reporter.report_models_dropped(model_name) continue @@ -2110,8 +2139,10 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 explore_reachable_views.add(view_name.include) except Exception as e: self.reporter.report_warning( - f"{model}.explores", - f"failed to process {explore_dict} due to {e}. Run with --debug for full stacktrace", + title="Failed to process explores", + message="Failed to process explore dictionary.", + context=f"Explore Details: {explore_dict}", + exc=e, ) logger.debug("Failed to process explore", exc_info=e) @@ -2191,8 +2222,10 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 ) except Exception as e: self.reporter.report_warning( - include.include, - f"unable to load Looker view {raw_view}: {repr(e)}", + title="Error Loading View", + message="Unable to load Looker View.", + context=f"View Details: {raw_view}", + exc=e, ) continue diff --git a/metadata-ingestion/src/datahub/ingestion/source/metabase.py b/metadata-ingestion/src/datahub/ingestion/source/metabase.py index 12a76ff7b33ff7..e401f60445d1bf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metabase.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metabase.py @@ -211,8 +211,8 @@ def setup_session(self) -> None: test_response.raise_for_status() except HTTPError as e: self.report.report_failure( - key="metabase-session", - reason=f"Unable to retrieve user {self.config.username} information. %s" + title="Unable to Retrieve Current User", + message=f"Unable to retrieve user {self.config.username} information. %s" % str(e), ) @@ -223,8 +223,8 @@ def close(self) -> None: ) if response.status_code not in (200, 204): self.report.report_failure( - key="metabase-session", - reason=f"Unable to logout for user {self.config.username}", + title="Unable to Log User Out", + message=f"Unable to logout for user {self.config.username}", ) super().close() @@ -257,8 +257,9 @@ def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]: except HTTPError as http_error: self.report.report_failure( - key="metabase-dashboard", - reason=f"Unable to retrieve dashboards. " f"Reason: {str(http_error)}", + title="Unable to Retrieve Dashboards", + message="Request to retrieve dashboards from Metabase failed.", + context=f"Error: {str(http_error)}", ) @staticmethod @@ -283,8 +284,9 @@ def construct_dashboard_from_api_data( dashboard_details = dashboard_response.json() except HTTPError as http_error: self.report.report_warning( - key=f"metabase-dashboard-{dashboard_id}", - reason=f"Unable to retrieve dashboard. " f"Reason: {str(http_error)}", + title="Unable to Retrieve Dashboard", + message="Request to retrieve dashboards from Metabase failed.", + context=f"Dashboard ID: {dashboard_id}, Error: {str(http_error)}", ) return None @@ -346,14 +348,16 @@ def _get_ownership(self, creator_id: int) -> Optional[OwnershipClass]: and http_error.response.status_code == 404 ): self.report.report_warning( - key=f"metabase-user-{creator_id}", - reason=f"User {creator_id} is blocked in Metabase or missing", + title="Cannot find user", + message="User is blocked in Metabase or missing", + context=f"Creator ID: {creator_id}", ) return None # For cases when the error is not 404 but something else self.report.report_warning( - key=f"metabase-user-{creator_id}", - reason=f"Unable to retrieve User info. " f"Reason: {str(http_error)}", + title="Failed to retrieve user", + message="Request to Metabase Failed", + context=f"Creator ID: {creator_id}, Error: {str(http_error)}", ) return None @@ -385,8 +389,9 @@ def emit_card_mces(self) -> Iterable[MetadataWorkUnit]: except HTTPError as http_error: self.report.report_failure( - key="metabase-cards", - reason=f"Unable to retrieve cards. " f"Reason: {str(http_error)}", + title="Unable to Retrieve Cards", + message="Request to retrieve cards from Metabase failed.", + context=f"Error: {str(http_error)}", ) return None @@ -407,8 +412,9 @@ def get_card_details_by_id(self, card_id: Union[int, str]) -> dict: return card_response.json() except HTTPError as http_error: self.report.report_warning( - key=f"metabase-card-{card_id}", - reason=f"Unable to retrieve Card info. " f"Reason: {str(http_error)}", + title="Unable to Retrieve Card", + message="Request to retrieve Card from Metabase failed.", + context=f"Card ID: {card_id}, Error: {str(http_error)}", ) return {} @@ -416,16 +422,18 @@ def construct_card_from_api_data(self, card_data: dict) -> Optional[ChartSnapsho card_id = card_data.get("id") if card_id is None: self.report.report_warning( - key="metabase-card", - reason=f"Unable to get Card id from card data {str(card_data)}", + title="Card is missing 'id'", + message="Unable to get field id from card data.", + context=f"Card Details: {str(card_data)}", ) return None card_details = self.get_card_details_by_id(card_id) if not card_details: self.report.report_warning( - key=f"metabase-card-{card_id}", - reason="Unable to construct Card due to empty card details", + title="Missing Card Details", + message="Unable to construct Card due to empty card details", + context=f"Card ID: {card_id}", ) return None @@ -500,16 +508,18 @@ def _get_chart_type(self, card_id: int, display_type: str) -> Optional[str]: } if not display_type: self.report.report_warning( - key=f"metabase-card-{card_id}", - reason=f"Card type {display_type} is missing. Setting to None", + title="Unrecognized Card Type", + message=f"Unrecognized card type {display_type} found. Setting to None", + context=f"Card ID: {card_id}", ) return None try: chart_type = type_mapping[display_type] except KeyError: self.report.report_warning( - key=f"metabase-card-{card_id}", - reason=f"Chart type {display_type} not supported. Setting to None", + title="Unrecognized Chart Type", + message=f"Unrecognized chart type {display_type} found. Setting to None", + context=f"Card ID: {card_id}", ) chart_type = None @@ -543,8 +553,9 @@ def get_datasource_urn( ) -> Optional[List]: if recursion_depth > DATASOURCE_URN_RECURSION_LIMIT: self.report.report_warning( - key=f"metabase-card-{card_details.get('id')}", - reason="Unable to retrieve Card info. Reason: source table recursion depth exceeded", + title="Unable to Retrieve Card Info", + message="Unable to retrieve Card info. Source table recursion depth exceeded.", + context=f"Card Details: {card_details}", ) return None @@ -557,8 +568,9 @@ def get_datasource_urn( ) = self.get_datasource_from_id(datasource_id) if not platform: self.report.report_warning( - key=f"metabase-datasource-{datasource_id}", - reason=f"Unable to detect platform for database id {datasource_id}", + title="Unable to find Data Platform", + message="Unable to detect Data Platform for database id", + context=f"Data Source ID: {datasource_id}", ) return None @@ -613,8 +625,9 @@ def get_datasource_urn( f"{result.debug_info.table_error}" ) self.report.report_warning( - key="metabase-query", - reason=f"Unable to retrieve lineage from query: {raw_query_stripped}", + title="Failed to Extract Lineage", + message="Unable to retrieve lineage from query", + context=f"Query: {raw_query_stripped}", ) return result.in_tables @@ -654,8 +667,9 @@ def get_source_table_from_id( except HTTPError as http_error: self.report.report_warning( - key=f"metabase-table-{table_id}", - reason=f"Unable to retrieve source table. Reason: {str(http_error)}", + title="Failed to Retrieve Source Table", + message="Request to retrieve source table from Metadabase failed", + context=f"Table ID: {table_id}, Error: {str(http_error)}", ) return None, None @@ -702,8 +716,9 @@ def get_datasource_from_id( dataset_json = dataset_response.json() except HTTPError as http_error: self.report.report_warning( - key=f"metabase-datasource-{datasource_id}", - reason=f"Unable to retrieve Datasource. " f"Reason: {str(http_error)}", + title="Unable to Retrieve Data Source", + message="Request to retrieve data source from Metabase failed.", + context=f"Data Source ID: {datasource_id}, Error: {str(http_error)}", ) # returning empty string as `platform` because # `make_dataset_urn_with_platform_instance()` only accepts `str` @@ -730,8 +745,9 @@ def get_datasource_from_id( platform = engine self.report.report_warning( - key=f"metabase-platform-{datasource_id}", - reason=f"Platform was not found in DataHub. Using {platform} name as is", + title="Unrecognized Data Platform found", + message="Data Platform was not found. Using platform name as is", + context=f"Platform: {platform}", ) platform_instance = self.get_platform_instance( @@ -766,8 +782,9 @@ def get_datasource_from_id( dbname = self.config.database_alias_map[platform] else: self.report.report_warning( - key=f"metabase-dbname-{datasource_id}", - reason=f"Cannot determine database name for platform: {platform}", + title="Cannot resolve Database Name", + message="Cannot determine database name for platform", + context=f"Platform: {platform}", ) return platform, dbname, schema, platform_instance diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index 4aa6248d09ab38..4b4822bcb98cae 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -311,8 +311,9 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig): self._get_request_json(f"{self.config.connect_uri}/api/verify") except HTTPError as http_error: self.report.report_failure( - key="mode-session", - reason=f"Unable to verify connection. Error was: {str(http_error)}", + title="Failed to Connect", + message="Unable to verify connection to mode.", + context=f"Error: {str(http_error)}", ) self.workspace_uri = f"{self.config.connect_uri}/api/{self.config.workspace}" @@ -371,15 +372,15 @@ def construct_dashboard( if not report_token: self.report.report_warning( - key="mode-report", - reason=f"Report token is missing for {report_info.get('id', '')}", + title="Missing Report Token", + message=f"Report token is missing for {report_info.get('id', '')}", ) return None if not report_info.get("id"): self.report.report_warning( - key="mode-report", - reason=f"Report id is missing for {report_info.get('token', '')}", + title="Missing Report ID", + message=f"Report id is missing for {report_info.get('token', '')}", ) return None @@ -488,9 +489,9 @@ def _get_creator(self, href: str) -> Optional[str]: ) except HTTPError as http_error: self.report.report_warning( - key="mode-user", - reason=f"Unable to retrieve user for {href}, " - f"Reason: {str(http_error)}", + title="Failed to retrieve Mode creator", + message=f"Unable to retrieve user for {href}", + context=f"Reason: {str(http_error)}", ) return user @@ -528,9 +529,9 @@ def _get_space_name_and_tokens(self) -> dict: space_info[s.get("token", "")] = s.get("name", "") except HTTPError as http_error: self.report.report_failure( - key="mode-spaces", - reason=f"Unable to retrieve spaces/collections for {self.workspace_uri}, " - f"Reason: {str(http_error)}", + title="Failed to Retrieve Spaces", + message="Unable to retrieve spaces / collections for workspace.", + context=f"Workspace: {self.workspace_uri}, Error: {str(http_error)}", ) return space_info @@ -559,17 +560,19 @@ def _get_chart_type(self, token: str, display_type: str) -> Optional[str]: "histogram": ChartTypeClass.HISTOGRAM, } if not display_type: - self.report.report_warning( - key="mode-chart-type-mapper", - reason=f"{token}: Chart type is missing. Setting to None", + self.report.info( + title="Missing chart type found", + message="Chart type is missing. Setting to None", + context=f"Token: {token}", ) return None try: chart_type = type_mapping[display_type] except KeyError: - self.report.report_warning( - key="mode-chart-type-mapper", - reason=f"{token}: Chart type {display_type} not supported. Setting to None", + self.report.info( + title="Unrecognized chart type found", + message=f"Chart type {display_type} not supported. Setting to None", + context=f"Token: {token}", ) chart_type = None @@ -661,8 +664,8 @@ def _get_datahub_friendly_platform(self, adapter, platform): return platform_mapping[adapter] else: self.report.report_warning( - key=f"mode-platform-{adapter}", - reason=f"Platform was not found in DataHub. " + title="Unrecognized Platform Found", + message=f"Platform was not found in DataHub. " f"Using {platform} name as is", ) @@ -676,8 +679,9 @@ def _get_data_sources(self) -> List[dict]: data_sources = ds_json.get("_embedded", {}).get("data_sources", []) except HTTPError as http_error: self.report.report_failure( - key="mode-data-sources", - reason=f"Unable to retrieve data sources. Reason: {str(http_error)}", + title="Failed to retrieve Data Sources", + message="Unable to retrieve data sources from Mode.", + context=f"Error: {str(http_error)}", ) return data_sources @@ -690,8 +694,9 @@ def _get_platform_and_dbname( if not data_sources: self.report.report_failure( - key=f"mode-datasource-{data_source_id}", - reason=f"No data sources found for datasource id: " f"{data_source_id}", + title="No Data Sources Found", + message="Could not find data sources matching some ids", + context=f"Data Soutce ID: {data_source_id}", ) return None, None @@ -704,8 +709,8 @@ def _get_platform_and_dbname( return platform, database else: self.report.report_warning( - key=f"mode-datasource-{data_source_id}", - reason=f"Cannot create datasource urn for datasource id: " + title="Failed to create Data Platform Urn", + message=f"Cannot create datasource urn for datasource id: " f"{data_source_id}", ) return None, None @@ -761,9 +766,9 @@ def _get_definition(self, definition_name): except HTTPError as http_error: self.report.report_failure( - key=f"mode-definition-{definition_name}", - reason=f"Unable to retrieve definition for {definition_name}, " - f"Reason: {str(http_error)}", + title="Failed to Retrieve Definition", + message="Unable to retrieve definition from Mode.", + context=f"Definition Name: {definition_name}, Error: {str(http_error)}", ) return None @@ -782,10 +787,9 @@ def _get_source_from_query(self, raw_query: str) -> set: source_paths.add(f"{source_schema}.{source_table}") except Exception as e: self.report.report_failure( - key="mode-query", - reason=f"Unable to retrieve lineage from query. " - f"Query: {raw_query} " - f"Reason: {str(e)} ", + title="Failed to Extract Lineage From Query", + message="Unable to retrieve lineage from Mode query.", + context=f"Query: {raw_query}, Error: {str(e)}", ) return source_paths @@ -1321,9 +1325,9 @@ def _get_reports(self, space_token: str) -> List[dict]: reports = reports_json.get("_embedded", {}).get("reports", {}) except HTTPError as http_error: self.report.report_failure( - key=f"mode-report-{space_token}", - reason=f"Unable to retrieve reports for space token: {space_token}, " - f"Reason: {str(http_error)}", + title="Failed to Retrieve Reports for Space", + message="Unable to retrieve reports for space token.", + context=f"Space Token: {space_token}, Error: {str(http_error)}", ) return reports @@ -1337,9 +1341,9 @@ def _get_queries(self, report_token: str) -> list: queries = queries_json.get("_embedded", {}).get("queries", {}) except HTTPError as http_error: self.report.report_failure( - key=f"mode-query-{report_token}", - reason=f"Unable to retrieve queries for report token: {report_token}, " - f"Reason: {str(http_error)}", + title="Failed to Retrieve Queries", + message="Unable to retrieve queries for report token.", + context=f"Report Token: {report_token}, Error: {str(http_error)}", ) return queries @@ -1354,9 +1358,9 @@ def _get_last_query_run( queries = queries_json.get("_embedded", {}).get("queries", {}) except HTTPError as http_error: self.report.report_failure( - key=f"mode-query-{report_token}", - reason=f"Unable to retrieve queries for report token: {report_token}, " - f"Reason: {str(http_error)}", + title="Failed to Retrieve Queries for Report", + message="Unable to retrieve queries for report token.", + context=f"Report Token:{report_token}, Error: {str(http_error)}", ) return {} return queries @@ -1372,11 +1376,11 @@ def _get_charts(self, report_token: str, query_token: str) -> list: charts = charts_json.get("_embedded", {}).get("charts", {}) except HTTPError as http_error: self.report.report_failure( - key=f"mode-chart-{report_token}-{query_token}", - reason=f"Unable to retrieve charts: " - f"Report token: {report_token} " + title="Failed to Retrieve Charts", + message="Unable to retrieve charts from Mode.", + context=f"Report Token: {report_token}, " f"Query token: {query_token}, " - f"Reason: {str(http_error)}", + f"Error: {str(http_error)}", ) return charts diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index af6b44677dffac..7ce3b5bc34da2f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -317,8 +317,9 @@ def get_pymongo_type_string( try: type_string = PYMONGO_TYPE_TO_MONGO_TYPE[field_type] except KeyError: - self.report.report_warning( - collection_name, f"unable to map type {field_type} to metadata schema" + self.report.warning( + message="Unrecognized column types found", + context=f"Collection: {collection_name}, field type {field_type}", ) PYMONGO_TYPE_TO_MONGO_TYPE[field_type] = "unknown" type_string = "unknown" @@ -341,8 +342,9 @@ def get_field_type( TypeClass: Optional[Type] = _field_type_mapping.get(field_type) if TypeClass is None: - self.report.report_warning( - collection_name, f"unable to map type {field_type} to metadata schema" + self.report.warning( + message="Unrecognized column type found", + context=f"Collection: {collection_name}, field type {field_type}", ) TypeClass = NullTypeClass @@ -418,8 +420,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: if collection_schema_size > max_schema_size: # downsample the schema, using frequency as the sort key self.report.report_warning( - key=dataset_urn, - reason=f"Downsampling the collection schema because it has {collection_schema_size} fields. Threshold is {max_schema_size}", + title="Too many schema fields", + message=f"Downsampling the collection schema because it has too many schema fields. Configured threshold is {max_schema_size}", + context=f"Schema Size: {collection_schema_size}, Collection: {dataset_urn}", ) # Add this information to the custom properties so user can know they are looking at downsampled schema dataset_properties.customProperties[ diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index d650a9b1d1b4c2..1ad55407e2750c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -570,9 +570,9 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 ) if not pg_response.ok: - self.report_warning( - self.config.site_url, + self.report.warning( "Failed to get process group flow " + pg.get("id"), + self.config.site_url, ) continue @@ -619,11 +619,11 @@ def update_flow_keep_only_ingress_egress(self): or c.name.startswith("Fetch") or c.name.startswith("Put") ): - self.report_warning( - self.config.site_url, + self.report.warning( f"Dropping NiFi Processor of type {c.type}, id {c.id}, name {c.name} from lineage view. \ This is likely an Ingress or Egress node which may be reading to/writing from external datasets \ However not currently supported in datahub", + self.config.site_url, ) else: logger.debug( @@ -734,10 +734,10 @@ def fetch_provenance_events( processor, eventType, startDate, oldest_event_time ) else: - self.report_warning( - self.config.site_url, - f"provenance events could not be fetched for processor \ + self.report.warning( + f"Provenance events could not be fetched for processor \ {processor.id} of type {processor.name}", + self.config.site_url, ) logger.warning(provenance_response.text) return @@ -794,10 +794,6 @@ def submit_provenance_query(self, processor, eventType, startDate, endDate): return provenance_response - def report_warning(self, key: str, reason: str) -> None: - logger.warning(f"{key}: {reason}") - self.report.report_warning(key, reason) - def delete_provenance(self, provenance_uri): delete_response = self.session.delete(provenance_uri) if not delete_response.ok: @@ -826,7 +822,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 outgoing = list( filter(lambda x: x[0] == component.id, self.nifi_flow.connections) ) - inputJobs = [] + inputJobs = set() jobProperties = None if component.nifi_type is NifiType.PROCESSOR: @@ -877,7 +873,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 dataset_urn, ) else: - inputJobs.append( + inputJobs.add( builder.make_data_job_urn_with_flow(flow_urn, incoming_from) ) @@ -900,11 +896,11 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 site_urls = component.target_uris.split(",") # type: ignore for site_url in site_urls: if site_url not in self.config.site_url_to_site_name: - self.report_warning( - site_url, + self.report.warning( f"Site with url {site_url} is being used in flow but\ corresponding site name is not configured via site_url_to_site_name.\ This may result in broken lineage.", + site_url, ) else: site_name = self.config.site_url_to_site_name[site_url] @@ -921,11 +917,11 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 site_urls = component.target_uris.split(",") # type: ignore for site_url in site_urls: if site_url not in self.config.site_url_to_site_name: - self.report_warning( - self.config.site_url, + self.report.warning( f"Site with url {site_url} is being used in flow but\ corresponding site name is not configured via site_url_to_site_name.\ This may result in broken lineage.", + self.config.site_url, ) else: site_name = self.config.site_url_to_site_name[site_url] @@ -957,7 +953,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 job_properties=jobProperties, inlets=list(component.inlets.keys()), outlets=list(component.outlets.keys()), - inputJobs=inputJobs, + inputJobs=list(inputJobs), status=component.status, ) @@ -1054,17 +1050,15 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: try: self.authenticate() except Exception as e: - logger.error("Failed to authenticate", exc_info=e) - self.report.report_failure(self.config.site_url, "Failed to authenticate") + self.report.failure("Failed to authenticate", self.config.site_url, exc=e) return # Creates nifi_flow by invoking /flow rest api and saves as self.nifi_flow try: self.create_nifi_flow() except Exception as e: - logger.error("Failed to get root process group flow", exc_info=e) - self.report.report_failure( - self.config.site_url, "Failed to get root process group flow" + self.report.failure( + "Failed to get root process group flow", self.config.site_url, exc=e ) return diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index 0aa524547bd8ed..8289265483d598 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -184,27 +184,40 @@ def __init__(self, config: OpenApiConfig, ctx: PipelineContext, platform: str): self.report = SourceReport() self.url_basepath = "" - def report_bad_responses(self, status_code: int, key: str) -> None: + def report_bad_responses(self, status_code: int, type: str) -> None: if status_code == 400: self.report.report_warning( - key=key, reason="Unknown error for reaching endpoint" + title="Failed to Extract Metadata", + message="Bad request body when retrieving data from OpenAPI endpoint", + context=f"Endpoint Type: {type}, Status Code: {status_code}", ) elif status_code == 403: - self.report.report_warning(key=key, reason="Not authorised to get endpoint") + self.report.report_warning( + title="Unauthorized to Extract Metadata", + message="Received unauthorized response when attempting to retrieve data from OpenAPI endpoint", + context=f"Endpoint Type: {type}, Status Code: {status_code}", + ) elif status_code == 404: self.report.report_warning( - key=key, - reason="Unable to find an example for endpoint. Please add it to the list of forced examples.", + title="Failed to Extract Metadata", + message="Unable to find an example for endpoint. Please add it to the list of forced examples.", + context=f"Endpoint Type: {type}, Status Code: {status_code}", ) elif status_code == 500: self.report.report_warning( - key=key, reason="Server error for reaching endpoint" + title="Failed to Extract Metadata", + message="Received unknown server error from OpenAPI endpoint", + context=f"Endpoint Type: {type}, Status Code: {status_code}", ) elif status_code == 504: - self.report.report_warning(key=key, reason="Timeout for reaching endpoint") + self.report.report_warning( + title="Failed to Extract Metadata", + message="Timed out when attempting to retrieve data from OpenAPI endpoint", + context=f"Endpoint Type: {type}, Status Code: {status_code}", + ) else: raise Exception( - f"Unable to retrieve endpoint, response code {status_code}, key {key}" + f"Unable to retrieve endpoint, response code {status_code}, key {type}" ) def init_dataset( @@ -271,7 +284,7 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 for w in warn_c: w_msg = w.message w_spl = w_msg.args[0].split(" --- ") # type: ignore - self.report.report_warning(key=w_spl[1], reason=w_spl[0]) + self.report.report_warning(message=w_spl[1], context=w_spl[0]) # here we put a sample from the "listing endpoint". To be used for later guessing of comosed endpoints. root_dataset_samples = {} @@ -293,8 +306,9 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 yield self.build_wu(dataset_snapshot, dataset_name) elif endpoint_dets["method"] != "get": self.report.report_warning( - key=endpoint_k, - reason=f"No example provided for {endpoint_dets['method']}", + title="Failed to Extract Endpoint Metadata", + message=f"No example provided for {endpoint_dets['method']}", + context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}", ) continue # Only test endpoints if they're GETs elif ( @@ -319,13 +333,16 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 response, dataset_name ) if not fields2add: - self.report.report_warning(key=endpoint_k, reason="No Fields") + self.report.info( + message="No fields found from endpoint response.", + context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}", + ) schema_metadata = set_metadata(dataset_name, fields2add) dataset_snapshot.aspects.append(schema_metadata) yield self.build_wu(dataset_snapshot, dataset_name) else: - self.report_bad_responses(response.status_code, key=endpoint_k) + self.report_bad_responses(response.status_code, type=endpoint_k) else: if endpoint_k not in config.forced_examples.keys(): # start guessing... @@ -347,15 +364,16 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 if response.status_code == 200: fields2add, _ = extract_fields(response, dataset_name) if not fields2add: - self.report.report_warning( - key=endpoint_k, reason="No Fields" + self.report.info( + message="No fields found from endpoint response.", + context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}", ) schema_metadata = set_metadata(dataset_name, fields2add) dataset_snapshot.aspects.append(schema_metadata) yield self.build_wu(dataset_snapshot, dataset_name) else: - self.report_bad_responses(response.status_code, key=endpoint_k) + self.report_bad_responses(response.status_code, type=endpoint_k) else: composed_url = compose_url_attr( raw_url=endpoint_k, attr_list=config.forced_examples[endpoint_k] @@ -377,15 +395,16 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 if response.status_code == 200: fields2add, _ = extract_fields(response, dataset_name) if not fields2add: - self.report.report_warning( - key=endpoint_k, reason="No Fields" + self.report.info( + message="No fields found from endpoint response.", + context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}", ) schema_metadata = set_metadata(dataset_name, fields2add) dataset_snapshot.aspects.append(schema_metadata) yield self.build_wu(dataset_snapshot, dataset_name) else: - self.report_bad_responses(response.status_code, key=endpoint_k) + self.report_bad_responses(response.status_code, type=endpoint_k) def get_report(self): return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/redash.py b/metadata-ingestion/src/datahub/ingestion/source/redash.py index c82b1c030e2430..b833a3691c0818 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redash.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redash.py @@ -659,7 +659,7 @@ def _get_chart_type_from_viz_data(self, viz_data: Dict) -> str: viz_type = viz_data.get("type", "") viz_options = viz_data.get("options", {}) globalSeriesType = viz_options.get("globalSeriesType", "") - report_key = f"redash-chart-{viz_data['id']}" + report_type = f"redash-chart-{viz_data['id']}" # handle Plotly chart types if viz_type == "CHART": @@ -667,14 +667,14 @@ def _get_chart_type_from_viz_data(self, viz_data: Dict) -> str: if chart_type is None: chart_type = DEFAULT_VISUALIZATION_TYPE message = f"ChartTypeClass for Redash Visualization Type={viz_type} with options.globalSeriesType={globalSeriesType} is missing. Setting to {DEFAULT_VISUALIZATION_TYPE}" - self.report.report_warning(key=report_key, reason=message) + self.report.report_warning(title=report_type, message=message) logger.warning(message) else: chart_type = VISUALIZATION_TYPE_MAP.get(viz_type) if chart_type is None: chart_type = DEFAULT_VISUALIZATION_TYPE message = f"ChartTypeClass for Redash Visualization Type={viz_type} is missing. Setting to {DEFAULT_VISUALIZATION_TYPE}" - self.report.report_warning(key=report_key, reason=message) + self.report.report_warning(title=report_type, message=message) logger.warning(message) return chart_type @@ -713,8 +713,8 @@ def _get_chart_snapshot(self, query_data: Dict, viz_data: Dict) -> ChartSnapshot self.report.charts_no_input.add(chart_urn) self.report.queries_no_dataset.add(str(query_id)) self.report.report_warning( - key="redash-chart-input-missing", - reason=f"For viz-id-{viz_id}-query-{query_id}-datasource-{data_source_id} data_source_type={data_source_type} no datasources found. Setting inputs to None", + title="redash-chart-input-missing", + message=f"For viz-id-{viz_id}-query-{query_id}-datasource-{data_source_id} data_source_type={data_source_type} no datasources found. Setting inputs to None", ) chart_info = ChartInfoClass( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index cd54ab774418e6..2527ca6bc76c1a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -129,8 +129,6 @@ logger: logging.Logger = logging.getLogger(__name__) -MISSING_COLUMN_INFO = "missing column information" - @dataclass class SQLSourceReport(StaleEntityRemovalSourceReport, ClassificationReportMixin): @@ -921,7 +919,7 @@ def _get_columns( try: columns = inspector.get_columns(table, schema) if len(columns) == 0: - self.warn(logger, MISSING_COLUMN_INFO, dataset_name) + self.warn(logger, "missing column information", dataset_name) except Exception as e: self.warn( logger, @@ -1258,13 +1256,6 @@ def loop_profiler_requests( logger.debug(f"{dataset_name} has already been seen, skipping...") continue - missing_column_info_warn = self.report.warnings.get(MISSING_COLUMN_INFO) - if ( - missing_column_info_warn is not None - and dataset_name in missing_column_info_warn - ): - continue - (partition, custom_sql) = self.generate_partition_profiler_query( schema, table, self.config.profiling.partition_datetime ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py index 5b1a815e178b18..ae56fb87ee5281 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py @@ -52,7 +52,6 @@ if TYPE_CHECKING: from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest -MISSING_COLUMN_INFO = "missing column information" logger: logging.Logger = logging.getLogger(__name__) @@ -542,12 +541,7 @@ def loop_profiler_requests( else: logger.debug(f"{dataset_name} has already been seen, skipping...") continue - missing_column_info_warn = self.report.warnings.get(MISSING_COLUMN_INFO) - if ( - missing_column_info_warn is not None - and dataset_name in missing_column_info_warn - ): - continue + (partition, custom_sql) = self.generate_partition_profiler_query( schema, projection, self.config.profiling.partition_datetime ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index c19d48c3dc4ac3..8d37b39589087a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -800,8 +800,9 @@ def _authenticate(self) -> None: # Note that we're not catching ConfigurationError, since we want that to throw. except ValueError as e: self.report.failure( - key="tableau-login", - reason=str(e), + title="Tableau Login Error", + message="Failed to authenticate with Tableau.", + exc=e, ) def get_data_platform_instance(self) -> DataPlatformInstanceClass: @@ -881,7 +882,10 @@ def get_connection_object_page( error and (error.get(c.EXTENSIONS) or {}).get(c.SEVERITY) == c.WARNING for error in errors ): - self.report.warning(key=connection_type, reason=f"{errors}") + self.report.warning( + message=f"Received error fetching Query Connection {connection_type}", + context=f"Errors: {errors}", + ) else: raise RuntimeError(f"Query {connection_type} error: {errors}") @@ -2821,8 +2825,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: yield from self.emit_upstream_tables() except MetadataQueryException as md_exception: self.report.failure( - key="tableau-metadata", - reason=f"Unable to retrieve metadata from tableau. Information: {str(md_exception)}", + title="Failed to Retrieve Tableau Metadata", + message="Unable to retrieve metadata from tableau.", + context=str(md_exception), ) def get_report(self) -> TableauSourceReport: diff --git a/metadata-ingestion/src/datahub/utilities/lossy_collections.py b/metadata-ingestion/src/datahub/utilities/lossy_collections.py index 6f5f4bda369a06..f71aad51ab0b6b 100644 --- a/metadata-ingestion/src/datahub/utilities/lossy_collections.py +++ b/metadata-ingestion/src/datahub/utilities/lossy_collections.py @@ -58,11 +58,19 @@ def __get_pydantic_core_schema__(cls, source_type, handler): # type: ignore return core_schema.no_info_after_validator_function(cls, handler(list)) def as_obj(self) -> List[Union[T, str]]: - base_list: List[Union[T, str]] = list(self.__iter__()) + from datahub.ingestion.api.report import Report + + base_list: List[Union[T, str]] = [ + Report.to_pure_python_obj(value) for value in list(self.__iter__()) + ] if self.sampled: base_list.append(f"... sampled of {self.total_elements} total elements") return base_list + def set_total(self, total: int) -> None: + self.total_elements = total + self.sampled = self.total_elements > self.max_elements + class LossySet(Set[T], Generic[T]): """A set that only preserves a sample of elements in a set. Currently this is a very simple greedy sampling set""" @@ -145,9 +153,13 @@ def as_obj(self) -> Dict[Union[_KT, str], Union[_VT, str]]: if self.sampled: base_dict[ "sampled" - ] = f"{len(self.keys())} sampled of at most {self.max_elements + self._overflow} entries." + ] = f"{len(self.keys())} sampled of at most {self.total_key_count()} entries." return base_dict + def total_key_count(self) -> int: + """Returns the total number of keys that have been added to this dictionary.""" + return super().__len__() + self._overflow + def dropped_keys_count(self) -> int: """Returns the number of keys that have been dropped from this dictionary.""" return self._overflow diff --git a/metadata-ingestion/tests/integration/metabase/test_metabase.py b/metadata-ingestion/tests/integration/metabase/test_metabase.py index 5c433f14f380ff..2d67f0ca5223f8 100644 --- a/metadata-ingestion/tests/integration/metabase/test_metabase.py +++ b/metadata-ingestion/tests/integration/metabase/test_metabase.py @@ -163,7 +163,7 @@ def test_pipeline(pytestconfig, tmp_path): @freeze_time(FROZEN_TIME) -def test_mode_ingest_success( +def test_metabase_ingest_success( pytestconfig, tmp_path, test_pipeline, mock_datahub_graph, default_json_response_map ): with patch( @@ -260,7 +260,7 @@ def test_stateful_ingestion( @freeze_time(FROZEN_TIME) -def test_mode_ingest_failure(pytestconfig, tmp_path, default_json_response_map): +def test_metabase_ingest_failure(pytestconfig, tmp_path, default_json_response_map): with patch( "datahub.ingestion.source.metabase.requests.session", side_effect=MockResponse.build_mocked_requests_failure( diff --git a/metadata-ingestion/tests/integration/mode/test_mode.py b/metadata-ingestion/tests/integration/mode/test_mode.py index def7277494fe7e..0346767b05d253 100644 --- a/metadata-ingestion/tests/integration/mode/test_mode.py +++ b/metadata-ingestion/tests/integration/mode/test_mode.py @@ -139,7 +139,3 @@ def test_mode_ingest_failure(pytestconfig, tmp_path): except PipelineExecutionError as exec_error: assert exec_error.args[0] == "Source reported errors" assert len(exec_error.args[1].failures) == 1 - assert ( - list(exec_error.args[1].failures.keys())[0] - == "mode-report-75737b70402e" - ) diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py index 23f5c10b10f8e8..32c2a63c3ac593 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py @@ -74,7 +74,9 @@ def test_snowflake_missing_role_access_causes_pipeline_failure( pipeline = Pipeline(snowflake_pipeline_config) pipeline.run() - assert "permission-error" in pipeline.source.get_report().failures.keys() + assert "permission-error" in [ + failure.message for failure in pipeline.source.get_report().failures + ] @freeze_time(FROZEN_TIME) @@ -96,7 +98,9 @@ def test_snowflake_missing_warehouse_access_causes_pipeline_failure( ) pipeline = Pipeline(snowflake_pipeline_config) pipeline.run() - assert "permission-error" in pipeline.source.get_report().failures.keys() + assert "permission-error" in [ + failure.message for failure in pipeline.source.get_report().failures + ] @freeze_time(FROZEN_TIME) @@ -118,7 +122,9 @@ def test_snowflake_no_databases_with_access_causes_pipeline_failure( ) pipeline = Pipeline(snowflake_pipeline_config) pipeline.run() - assert "permission-error" in pipeline.source.get_report().failures.keys() + assert "permission-error" in [ + failure.message for failure in pipeline.source.get_report().failures + ] @freeze_time(FROZEN_TIME) @@ -146,7 +152,9 @@ def test_snowflake_no_tables_causes_pipeline_failure( pipeline = Pipeline(snowflake_pipeline_config) pipeline.run() - assert "permission-error" in pipeline.source.get_report().failures.keys() + assert "permission-error" in [ + failure.message for failure in pipeline.source.get_report().failures + ] @freeze_time(FROZEN_TIME) @@ -171,10 +179,9 @@ def test_snowflake_list_columns_error_causes_pipeline_warning( pipeline = Pipeline(snowflake_pipeline_config) pipeline.run() pipeline.raise_from_status() # pipeline should not fail - assert ( - "Failed to get columns for table" - in pipeline.source.get_report().warnings.keys() - ) + assert "Failed to get columns for table" in [ + warning.message for warning in pipeline.source.get_report().warnings + ] @freeze_time(FROZEN_TIME) @@ -197,10 +204,9 @@ def test_snowflake_list_primary_keys_error_causes_pipeline_warning( pipeline = Pipeline(snowflake_pipeline_config) pipeline.run() pipeline.raise_from_status() # pipeline should not fail - assert ( - "Failed to get primary key for table" - in pipeline.source.get_report().warnings.keys() - ) + assert "Failed to get primary key for table" in [ + warning.message for warning in pipeline.source.get_report().warnings + ] @freeze_time(FROZEN_TIME) @@ -229,9 +235,9 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure( ) pipeline = Pipeline(snowflake_pipeline_config) pipeline.run() - assert ( - "lineage-permission-error" in pipeline.source.get_report().failures.keys() - ) + assert "lineage-permission-error" in [ + failure.message for failure in pipeline.source.get_report().failures + ] @freeze_time(FROZEN_TIME) @@ -253,4 +259,6 @@ def test_snowflake_missing_snowflake_operations_permission_causes_pipeline_failu ) pipeline = Pipeline(snowflake_pipeline_config) pipeline.run() - assert "usage-permission-error" in pipeline.source.get_report().failures.keys() + assert "usage-permission-error" in [ + failure.message for failure in pipeline.source.get_report().failures + ] diff --git a/metadata-ingestion/tests/unit/api/test_report.py b/metadata-ingestion/tests/unit/api/test_report.py index a885a93a0a396a..acb4f055aa8280 100644 --- a/metadata-ingestion/tests/unit/api/test_report.py +++ b/metadata-ingestion/tests/unit/api/test_report.py @@ -8,8 +8,8 @@ def test_report_to_string_unsampled(): source_report.report_warning("key1", "problem 1") source_report.report_failure("key2", "reason 2") str = source_report.as_string() - assert "'warnings': {'key1': ['problem 1']}" in str - assert "'failures': {'key2': ['reason 2']}" in str + assert "'warnings': [{'message': 'key1', 'context': ['problem 1']}]" in str + assert "'failures': [{'message': 'key2', 'context': ['reason 2']}]" in str def test_report_to_string_sampled(): @@ -18,7 +18,7 @@ def test_report_to_string_sampled(): source_report.report_warning(f"key{i}", "Test message") str = source_report.as_string() - assert "'sampled': '10 sampled of at most 100 entries.'" in str + assert "sampled of 100 total elements" in str str = source_report.as_json() print(str) report_as_dict = json.loads(str) diff --git a/metadata-ingestion/tests/unit/test_nifi_source.py b/metadata-ingestion/tests/unit/test_nifi_source.py index 26c803927f9e67..9e8bf64261ffaf 100644 --- a/metadata-ingestion/tests/unit/test_nifi_source.py +++ b/metadata-ingestion/tests/unit/test_nifi_source.py @@ -333,9 +333,10 @@ def test_single_user_auth_failed_to_get_token(): list(source.get_workunits()) assert source.get_report().failures - assert "Failed to authenticate" in list( - source.get_report().failures[config.site_url] - ) + + assert "Failed to authenticate" in [ + failure.message for failure in source.get_report().failures + ] def test_kerberos_auth_failed_to_get_token(): @@ -352,9 +353,9 @@ def test_kerberos_auth_failed_to_get_token(): list(source.get_workunits()) assert source.get_report().failures - assert "Failed to authenticate" in list( - source.get_report().failures[config.site_url] - ) + assert "Failed to authenticate" in [ + failure.message for failure in source.get_report().failures + ] def test_client_cert_auth_failed(): @@ -372,9 +373,9 @@ def test_client_cert_auth_failed(): list(source.get_workunits()) assert source.get_report().failures - assert "Failed to authenticate" in list( - source.get_report().failures[config.site_url] - ) + assert "Failed to authenticate" in [ + failure.message for failure in source.get_report().failures + ] def test_failure_to_create_nifi_flow(): @@ -392,9 +393,9 @@ def test_failure_to_create_nifi_flow(): list(source.get_workunits()) assert source.get_report().failures - assert "Failed to get root process group flow" in list( - source.get_report().failures[config.site_url] - ) + assert "Failed to get root process group flow" in [ + failure.message for failure in source.get_report().failures + ] def test_site_url_no_context(): diff --git a/metadata-ingestion/tests/unit/test_pipeline.py b/metadata-ingestion/tests/unit/test_pipeline.py index 851350933f59e5..01403163bbfbdf 100644 --- a/metadata-ingestion/tests/unit/test_pipeline.py +++ b/metadata-ingestion/tests/unit/test_pipeline.py @@ -31,15 +31,16 @@ class TestPipeline: + @patch("confluent_kafka.Consumer", autospec=True) @patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True) @patch("datahub.ingestion.sink.console.ConsoleSink.close", autospec=True) @freeze_time(FROZEN_TIME) - def test_configure(self, mock_sink, mock_source): + def test_configure(self, mock_sink, mock_source, mock_consumer): pipeline = Pipeline.create( { "source": { "type": "kafka", - "config": {"connection": {"bootstrap": "localhost:9092"}}, + "config": {"connection": {"bootstrap": "fake-dns-name:9092"}}, }, "sink": {"type": "console"}, } diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/filter/RestliLoggingFilter.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/filter/RestliLoggingFilter.java index edd8270e87210e..eee64b3f24bb9a 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/filter/RestliLoggingFilter.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/filter/RestliLoggingFilter.java @@ -1,5 +1,6 @@ package com.linkedin.metadata.filter; +import com.linkedin.metadata.restli.NonExceptionHttpErrorResponse; import com.linkedin.restli.common.HttpMethod; import com.linkedin.restli.common.HttpStatus; import com.linkedin.restli.server.filter.Filter; @@ -33,7 +34,9 @@ public CompletableFuture onError( final FilterRequestContext requestContext, final FilterResponseContext responseContext) { logResponse(requestContext, responseContext); - log.error("Rest.li error: ", th); + if (!(th instanceof NonExceptionHttpErrorResponse)) { + log.error("Rest.li error: ", th); + } return CompletableFuture.completedFuture(null); } diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index 0d9a49d583b57a..cbca464d569a83 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -13,7 +13,6 @@ import com.codahale.metrics.MetricRegistry; import com.datahub.authentication.Authentication; import com.datahub.authentication.AuthenticationContext; -import com.datahub.authorization.EntitySpec; import com.datahub.plugins.auth.authorization.Authorizer; import com.google.common.annotations.VisibleForTesting; import com.linkedin.aspect.GetTimeseriesAspectValuesResponse; @@ -21,7 +20,6 @@ import com.linkedin.common.urn.Urn; import com.linkedin.metadata.aspect.EnvelopedAspectArray; import com.linkedin.metadata.aspect.VersionedAspect; -import com.linkedin.metadata.authorization.Disjunctive; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.IngestResult; @@ -153,9 +151,8 @@ public Task get( final VersionedAspect aspect = _entityService.getVersionedAspect(opContext, urn, aspectName, version); if (aspect == null) { - throw RestliUtil.resourceNotFoundException( - String.format( - "Did not find urn: %s aspect: %s version: %s", urn, aspectName, version)); + log.warn("Did not find urn: {} aspect: {} version: {}", urn, aspectName, version); + throw RestliUtil.nonExceptionResourceNotFound(); } return new AnyRecord(aspect.data()); }, diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/restli/NonExceptionHttpErrorResponse.java b/metadata-utils/src/main/java/com/linkedin/metadata/restli/NonExceptionHttpErrorResponse.java new file mode 100644 index 00000000000000..c99a3622d15207 --- /dev/null +++ b/metadata-utils/src/main/java/com/linkedin/metadata/restli/NonExceptionHttpErrorResponse.java @@ -0,0 +1,47 @@ +package com.linkedin.metadata.restli; + +import com.linkedin.restli.common.HttpStatus; +import com.linkedin.restli.server.RestLiServiceException; +import com.linkedin.restli.server.errors.ServiceError; + +/** + * Captures an error response (e.g. 404-not-found) that is not to be regarded as an + * exception within the server.
+ *
+ * Restli apparently requires http-error-responses to be represented by {@link + * RestLiServiceException}; thus, we need this class to specify an error response that isn't + * really an exception (in the context of the server).
+ * To highlight the unusual purpose of this exception, the name of this class is also deliberately + * unconventional (the class-name doesn't end with "Exception"). + */ +public class NonExceptionHttpErrorResponse extends RestLiServiceException { + + public NonExceptionHttpErrorResponse(HttpStatus status) { + super(status); + } + + public NonExceptionHttpErrorResponse(HttpStatus status, String message) { + super(status, message); + } + + public NonExceptionHttpErrorResponse(HttpStatus status, Throwable cause) { + super(status, cause); + } + + public NonExceptionHttpErrorResponse(HttpStatus status, String message, Throwable cause) { + super(status, message, cause); + } + + public NonExceptionHttpErrorResponse( + HttpStatus status, String message, Throwable cause, boolean writableStackTrace) { + super(status, message, cause, writableStackTrace); + } + + public NonExceptionHttpErrorResponse(ServiceError serviceError) { + super(serviceError); + } + + public NonExceptionHttpErrorResponse(ServiceError serviceError, Throwable cause) { + super(serviceError, cause); + } +} diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/restli/RestliUtil.java b/metadata-utils/src/main/java/com/linkedin/metadata/restli/RestliUtil.java index 737f79dc1c4417..c9b1d5a8a82de5 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/restli/RestliUtil.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/restli/RestliUtil.java @@ -79,6 +79,11 @@ public static RestLiServiceException resourceNotFoundException() { return resourceNotFoundException(null); } + @Nonnull + public static RestLiServiceException nonExceptionResourceNotFound() { + return new NonExceptionHttpErrorResponse(HttpStatus.S_404_NOT_FOUND); + } + @Nonnull public static RestLiServiceException resourceNotFoundException(@Nullable String message) { return new RestLiServiceException(HttpStatus.S_404_NOT_FOUND, message); diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/log/LogMessageFilter.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/log/LogMessageFilter.java index 67f0ae4c77eafd..3750c0ae0bc4ad 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/log/LogMessageFilter.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/log/LogMessageFilter.java @@ -1,6 +1,8 @@ package com.linkedin.metadata.utils.log; import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.classic.spi.ThrowableProxyUtil; import ch.qos.logback.core.filter.AbstractMatcherFilter; import ch.qos.logback.core.spi.FilterReply; import java.util.ArrayList; @@ -21,7 +23,21 @@ public FilterReply decide(ILoggingEvent event) { return FilterReply.NEUTRAL; } - if (this.excluded.stream().anyMatch(message -> event.getFormattedMessage().contains(message))) { + final String formattedMessage = event.getFormattedMessage(); + final IThrowableProxy throwableProxy = event.getThrowableProxy(); + + String throwableString; + if (throwableProxy != null) { + throwableString = ThrowableProxyUtil.asString(throwableProxy); + } else { + throwableString = null; + } + + if (this.excluded.stream() + .anyMatch( + message -> + formattedMessage.contains(message) + || (throwableString != null && throwableString.contains(message)))) { return FilterReply.DENY; } return FilterReply.ACCEPT;