Skip to content

Commit

Permalink
fix(ingest/mode): Handle 204 response and invalid json (datahub-proje…
Browse files Browse the repository at this point in the history
…ct#12156)

Co-authored-by: Aseem Bansal <[email protected]>
  • Loading branch information
asikowitz and anshbansal authored Dec 24, 2024
1 parent 48736a0 commit f4b33b5
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 36 deletions.
46 changes: 26 additions & 20 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import lru_cache
from json import JSONDecodeError
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union

import dateutil.parser as dp
Expand Down Expand Up @@ -193,6 +194,9 @@ class HTTPError429(HTTPError):
pass


ModeRequestError = (HTTPError, JSONDecodeError)


@dataclass
class ModeSourceReport(StaleEntityRemovalSourceReport):
filtered_spaces: LossyList[str] = dataclasses.field(default_factory=LossyList)
Expand Down Expand Up @@ -328,11 +332,11 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig):
# Test the connection
try:
self._get_request_json(f"{self.config.connect_uri}/api/verify")
except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_failure(
title="Failed to Connect",
message="Unable to verify connection to mode.",
context=f"Error: {str(http_error)}",
context=f"Error: {str(e)}",
)

self.workspace_uri = f"{self.config.connect_uri}/api/{self.config.workspace}"
Expand Down Expand Up @@ -521,11 +525,11 @@ def _get_creator(self, href: str) -> Optional[str]:
if self.config.owner_username_instead_of_email
else user_json.get("email")
)
except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_warning(
title="Failed to retrieve Mode creator",
message=f"Unable to retrieve user for {href}",
context=f"Reason: {str(http_error)}",
context=f"Reason: {str(e)}",
)
return user

Expand Down Expand Up @@ -571,11 +575,11 @@ def _get_space_name_and_tokens(self) -> dict:
logging.debug(f"Skipping space {space_name} due to space pattern")
continue
space_info[s.get("token", "")] = s.get("name", "")
except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_failure(
title="Failed to Retrieve Spaces",
message="Unable to retrieve spaces / collections for workspace.",
context=f"Workspace: {self.workspace_uri}, Error: {str(http_error)}",
context=f"Workspace: {self.workspace_uri}, Error: {str(e)}",
)

return space_info
Expand Down Expand Up @@ -721,11 +725,11 @@ def _get_data_sources(self) -> List[dict]:
try:
ds_json = self._get_request_json(f"{self.workspace_uri}/data_sources")
data_sources = ds_json.get("_embedded", {}).get("data_sources", [])
except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_failure(
title="Failed to retrieve Data Sources",
message="Unable to retrieve data sources from Mode.",
context=f"Error: {str(http_error)}",
context=f"Error: {str(e)}",
)

return data_sources
Expand Down Expand Up @@ -812,11 +816,11 @@ def _get_definition(self, definition_name):
if definition.get("name", "") == definition_name:
return definition.get("source", "")

except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_failure(
title="Failed to Retrieve Definition",
message="Unable to retrieve definition from Mode.",
context=f"Definition Name: {definition_name}, Error: {str(http_error)}",
context=f"Definition Name: {definition_name}, Error: {str(e)}",
)
return None

Expand Down Expand Up @@ -1382,11 +1386,11 @@ def _get_reports(self, space_token: str) -> List[dict]:
f"{self.workspace_uri}/spaces/{space_token}/reports"
)
reports = reports_json.get("_embedded", {}).get("reports", {})
except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_failure(
title="Failed to Retrieve Reports for Space",
message="Unable to retrieve reports for space token.",
context=f"Space Token: {space_token}, Error: {str(http_error)}",
context=f"Space Token: {space_token}, Error: {str(e)}",
)
return reports

Expand All @@ -1400,11 +1404,11 @@ def _get_datasets(self, space_token: str) -> List[dict]:
url = f"{self.workspace_uri}/spaces/{space_token}/datasets"
datasets_json = self._get_request_json(url)
datasets = datasets_json.get("_embedded", {}).get("reports", [])
except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_failure(
title="Failed to Retrieve Datasets for Space",
message=f"Unable to retrieve datasets for space token {space_token}.",
context=f"Error: {str(http_error)}",
context=f"Error: {str(e)}",
)
return datasets

Expand All @@ -1416,11 +1420,11 @@ def _get_queries(self, report_token: str) -> list:
f"{self.workspace_uri}/reports/{report_token}/queries"
)
queries = queries_json.get("_embedded", {}).get("queries", {})
except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_failure(
title="Failed to Retrieve Queries",
message="Unable to retrieve queries for report token.",
context=f"Report Token: {report_token}, Error: {str(http_error)}",
context=f"Report Token: {report_token}, Error: {str(e)}",
)
return queries

Expand All @@ -1433,11 +1437,11 @@ def _get_last_query_run(
f"{self.workspace_uri}/reports/{report_token}/runs/{report_run_id}/query_runs{query_run_id}"
)
queries = queries_json.get("_embedded", {}).get("queries", {})
except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_failure(
title="Failed to Retrieve Queries for Report",
message="Unable to retrieve queries for report token.",
context=f"Report Token:{report_token}, Error: {str(http_error)}",
context=f"Report Token:{report_token}, Error: {str(e)}",
)
return {}
return queries
Expand All @@ -1451,13 +1455,13 @@ def _get_charts(self, report_token: str, query_token: str) -> list:
f"/queries/{query_token}/charts"
)
charts = charts_json.get("_embedded", {}).get("charts", {})
except HTTPError as http_error:
except ModeRequestError as e:
self.report.report_failure(
title="Failed to Retrieve Charts",
message="Unable to retrieve charts from Mode.",
context=f"Report Token: {report_token}, "
f"Query token: {query_token}, "
f"Error: {str(http_error)}",
f"Error: {str(e)}",
)
return charts

Expand All @@ -1477,6 +1481,8 @@ def get_request():
response = self.session.get(
url, timeout=self.config.api_options.timeout
)
if response.status_code == 204: # No content, don't parse json
return {}
return response.json()
except HTTPError as http_error:
error_response = http_error.response
Expand Down
141 changes: 125 additions & 16 deletions metadata-ingestion/tests/integration/mode/test_mode.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import json
import pathlib
from typing import Sequence
from unittest.mock import patch

import pytest
from freezegun import freeze_time
from requests.models import HTTPError

from datahub.configuration.common import PipelineExecutionError
from datahub.ingestion.api.source import StructuredLogEntry
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers

Expand All @@ -28,7 +31,7 @@
"https://app.mode.com/api/acryl/reports/24f66e1701b6/queries": "dataset_queries_24f66e1701b6.json",
}

RESPONSE_ERROR_LIST = ["https://app.mode.com/api/acryl/spaces/75737b70402e/reports"]
ERROR_URL = "https://app.mode.com/api/acryl/spaces/75737b70402e/reports"

test_resources_dir = pathlib.Path(__file__).parent

Expand All @@ -49,6 +52,14 @@ def mount(self, prefix, adaptor):
return self

def get(self, url, timeout=40):
if self.error_list is not None and self.url in self.error_list:
http_error_msg = "{} Client Error: {} for url: {}".format(
400,
"Simulate error",
self.url,
)
raise HTTPError(http_error_msg, response=self)

self.url = url
self.timeout = timeout
response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}"
Expand All @@ -57,29 +68,46 @@ def get(self, url, timeout=40):
self.json_data = data
return self

def raise_for_status(self):
if self.error_list is not None and self.url in self.error_list:
http_error_msg = "{} Client Error: {} for url: {}".format(
400,
"Simulate error",
self.url,
)
raise HTTPError(http_error_msg, response=self)

class MockResponseJson(MockResponse):
def __init__(
self,
status_code: int = 200,
*,
json_empty_list: Sequence[str] = (),
json_error_list: Sequence[str] = (),
):
super().__init__(None, status_code)
self.json_empty_list = json_empty_list
self.json_error_list = json_error_list

def json(self):
if self.url in self.json_empty_list:
return json.loads("") # Shouldn't be called
if self.url in self.json_error_list:
return json.loads("{")
return super().json()

def get(self, url, timeout=40):
response = super().get(url, timeout)
if self.url in self.json_empty_list:
response.status_code = 204
return response


def mocked_requests_sucess(*args, **kwargs):
def mocked_requests_success(*args, **kwargs):
return MockResponse(None, 200)


def mocked_requests_failure(*args, **kwargs):
return MockResponse(RESPONSE_ERROR_LIST, 200)
return MockResponse([ERROR_URL], 200)


@freeze_time(FROZEN_TIME)
def test_mode_ingest_success(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.Session",
side_effect=mocked_requests_sucess,
side_effect=mocked_requests_success,
):
pipeline = Pipeline.create(
{
Expand Down Expand Up @@ -142,8 +170,89 @@ def test_mode_ingest_failure(pytestconfig, tmp_path):
}
)
pipeline.run()
try:
with pytest.raises(PipelineExecutionError) as exec_error:
pipeline.raise_from_status()
except PipelineExecutionError as exec_error:
assert exec_error.args[0] == "Source reported errors"
assert len(exec_error.args[1].failures) == 1
assert exec_error.value.args[0] == "Source reported errors"
assert len(exec_error.value.args[1].failures) == 1
error_dict: StructuredLogEntry
_level, error_dict = exec_error.value.args[1].failures[0]
error = next(iter(error_dict.context))
assert "Simulate error" in error
assert ERROR_URL in error


@freeze_time(FROZEN_TIME)
def test_mode_ingest_json_empty(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.Session",
side_effect=lambda *args, **kwargs: MockResponseJson(
json_empty_list=["https://app.mode.com/api/modeuser"]
),
):
global test_resources_dir
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"

pipeline = Pipeline.create(
{
"run_id": "mode-test",
"source": {
"type": "mode",
"config": {
"token": "xxxx",
"password": "xxxx",
"connect_uri": "https://app.mode.com/",
"workspace": "acryl",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/mode_mces.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status(raise_warnings=True)


@freeze_time(FROZEN_TIME)
def test_mode_ingest_json_failure(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.Session",
side_effect=lambda *args, **kwargs: MockResponseJson(
json_error_list=["https://app.mode.com/api/modeuser"]
),
):
global test_resources_dir
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"

pipeline = Pipeline.create(
{
"run_id": "mode-test",
"source": {
"type": "mode",
"config": {
"token": "xxxx",
"password": "xxxx",
"connect_uri": "https://app.mode.com/",
"workspace": "acryl",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/mode_mces.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status(raise_warnings=False)
with pytest.raises(PipelineExecutionError) as exec_error:
pipeline.raise_from_status(raise_warnings=True)
assert len(exec_error.value.args[1].warnings) > 0
error_dict: StructuredLogEntry
_level, error_dict = exec_error.value.args[1].warnings[0]
error = next(iter(error_dict.context))
assert "Expecting property name enclosed in double quotes" in error

0 comments on commit f4b33b5

Please sign in to comment.