Skip to content

Commit

Permalink
Merge branch 'feature/get-dataset-without-create'
Browse files Browse the repository at this point in the history
# Conflicts:
#	.github/workflows/python-package.yml
  • Loading branch information
RuslanBergenov committed May 21, 2022
2 parents 842923d + a7b90b4 commit df59389
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ env:

on:
push:
branches: [development, master, feature/incremental_sync_and_schema_logging]
branches: [development, master]
pull_request:
branches: [ master ]

Expand Down
21 changes: 13 additions & 8 deletions target_bigquery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from google.api_core import exceptions
from google.cloud import bigquery
from google.cloud.bigquery import Dataset
from google.cloud.exceptions import NotFound
from google.cloud.bigquery import DatasetReference

logger = singer.get_logger()

Expand All @@ -30,7 +32,7 @@ def emit_state(state):

def ensure_dataset(project_id, dataset_id, location):
"""
Given a project id, dataset id and location, creates BigQuery dataset
Given a project id, dataset id and location, creates BigQuery dataset if not exists
https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html
Expand All @@ -39,17 +41,20 @@ def ensure_dataset(project_id, dataset_id, location):
:param location, str: location for the dataset (US). Passed to bigquery.Client().
:return: client (BigQuery Client Object) and Dataset (BigQuery dataset)
"""
from google.cloud.bigquery import DatasetReference

client = bigquery.Client(project=project_id, location=location)

dataset_ref = DatasetReference(project_id, dataset_id)

try:
client.create_dataset(dataset_ref)
except exceptions.GoogleAPICallError as e:
if e.response.status_code == 409: # dataset exists
pass
else:
dataset = client.get_dataset(dataset_ref)
return client, dataset

except NotFound:
try:
client.create_dataset(dataset_ref)
except exceptions.GoogleAPICallError as e:
logger.critical(f"unable to create dataset {dataset_id} in project {project_id}; Exception {e}")
return 2 # sys.exit(2)

return client, Dataset(dataset_ref)
return client, Dataset(dataset_ref)
80 changes: 80 additions & 0 deletions tests/test_target_bigquery_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
import json
from google.cloud.bigquery import Dataset
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import pytest
import logging

from tests import unittestcore
from target_bigquery.utils import ensure_dataset

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


class TestTargetBigQueryUtils(unittestcore.BaseUnitTest):

def setUp(self):

self.config_file = os.path.join(
os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'),
'target-config.json')

config = json.load(open(self.config_file))

self.project_id = config["project_id"]

self.dataset_id = "target_bigquery_unit_test_ensure_dataset_function" # config["dataset_id"]

self.location = config.get("location", "US")

self.client = bigquery.Client(project=self.project_id)

def test_ensure_dataset(self):
"""
the purpose of this test is to show that the dataset is obtained, if it already exists
if it doesn't exist then it's created and then it is obtained
"""

# make sure dataset doesn't exist yet
logger.info("Dataset doesn't exist yet")
self.delete_dataset()

# assert that dataset doesn't exist yet
with pytest.raises(NotFound):
self.client.get_dataset(self.dataset_id) # Make an API request.

# PART 1
# create dataset and get dataset
client_1, dataset_newly_created = ensure_dataset(project_id=self.project_id,
dataset_id=self.dataset_id,
location=self.location)

# PART 2 (identical code to part 1, but now the dataset already exists)
# get dataset if dataset already exists
client_2, dataset_already_exists = ensure_dataset(project_id=self.project_id,
dataset_id=self.dataset_id,
location=self.location)
# PART 3: checks
dataset_list = [dataset_newly_created, dataset_already_exists]

for next_dataset in dataset_list:
dataset_dict = next_dataset.__dict__

assert type(next_dataset) == Dataset
assert dataset_dict["_properties"]["datasetReference"]["projectId"] == self.project_id
assert dataset_dict["_properties"]["datasetReference"]["datasetId"] == self.dataset_id

def tearDown(self):
self.delete_dataset()

def delete_dataset(self):
try:
self.client.delete_dataset(
dataset=self.dataset_id,
delete_contents=True
)
except Exception as e:
print(e)
pass

0 comments on commit df59389

Please sign in to comment.