diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 6babdc5..ffd3c66 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -26,7 +26,7 @@ env: on: push: - branches: [development, master, feature/incremental_sync_and_schema_logging] + branches: [development, master] pull_request: branches: [ master ] diff --git a/target_bigquery/utils.py b/target_bigquery/utils.py index 577d2cb..98c9fb9 100644 --- a/target_bigquery/utils.py +++ b/target_bigquery/utils.py @@ -6,6 +6,8 @@ from google.api_core import exceptions from google.cloud import bigquery from google.cloud.bigquery import Dataset +from google.cloud.exceptions import NotFound +from google.cloud.bigquery import DatasetReference logger = singer.get_logger() @@ -30,7 +32,7 @@ def emit_state(state): def ensure_dataset(project_id, dataset_id, location): """ - Given a project id, dataset id and location, creates BigQuery dataset + Given a project id, dataset id and location, creates BigQuery dataset if not exists https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html @@ -39,17 +41,20 @@ def ensure_dataset(project_id, dataset_id, location): :param location, str: location for the dataset (US). Passed to bigquery.Client(). :return: client (BigQuery Client Object) and Dataset (BigQuery dataset) """ - from google.cloud.bigquery import DatasetReference + client = bigquery.Client(project=project_id, location=location) dataset_ref = DatasetReference(project_id, dataset_id) + try: - client.create_dataset(dataset_ref) - except exceptions.GoogleAPICallError as e: - if e.response.status_code == 409: # dataset exists - pass - else: + dataset = client.get_dataset(dataset_ref) + return client, dataset + + except NotFound: + try: + client.create_dataset(dataset_ref) + except exceptions.GoogleAPICallError as e: logger.critical(f"unable to create dataset {dataset_id} in project {project_id}; Exception {e}") return 2 # sys.exit(2) - return client, Dataset(dataset_ref) + return client, Dataset(dataset_ref) diff --git a/tests/test_target_bigquery_utils.py b/tests/test_target_bigquery_utils.py new file mode 100644 index 0000000..4c7c1c4 --- /dev/null +++ b/tests/test_target_bigquery_utils.py @@ -0,0 +1,80 @@ +import os +import json +from google.cloud.bigquery import Dataset +from google.cloud import bigquery +from google.cloud.exceptions import NotFound +import pytest +import logging + +from tests import unittestcore +from target_bigquery.utils import ensure_dataset + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +class TestTargetBigQueryUtils(unittestcore.BaseUnitTest): + + def setUp(self): + + self.config_file = os.path.join( + os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target-config.json') + + config = json.load(open(self.config_file)) + + self.project_id = config["project_id"] + + self.dataset_id = "target_bigquery_unit_test_ensure_dataset_function" # config["dataset_id"] + + self.location = config.get("location", "US") + + self.client = bigquery.Client(project=self.project_id) + + def test_ensure_dataset(self): + """ + the purpose of this test is to show that the dataset is obtained, if it already exists + if it doesn't exist then it's created and then it is obtained + """ + + # make sure dataset doesn't exist yet + logger.info("Dataset doesn't exist yet") + self.delete_dataset() + + # assert that dataset doesn't exist yet + with pytest.raises(NotFound): + self.client.get_dataset(self.dataset_id) # Make an API request. + + # PART 1 + # create dataset and get dataset + client_1, dataset_newly_created = ensure_dataset(project_id=self.project_id, + dataset_id=self.dataset_id, + location=self.location) + + # PART 2 (identical code to part 1, but now the dataset already exists) + # get dataset if dataset already exists + client_2, dataset_already_exists = ensure_dataset(project_id=self.project_id, + dataset_id=self.dataset_id, + location=self.location) + # PART 3: checks + dataset_list = [dataset_newly_created, dataset_already_exists] + + for next_dataset in dataset_list: + dataset_dict = next_dataset.__dict__ + + assert type(next_dataset) == Dataset + assert dataset_dict["_properties"]["datasetReference"]["projectId"] == self.project_id + assert dataset_dict["_properties"]["datasetReference"]["datasetId"] == self.dataset_id + + def tearDown(self): + self.delete_dataset() + + def delete_dataset(self): + try: + self.client.delete_dataset( + dataset=self.dataset_id, + delete_contents=True + ) + except Exception as e: + print(e) + pass