Skip to content

Commit

Permalink
add location config to bigquery fastsync (#741)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmriego authored Jul 15, 2021
1 parent f715cc2 commit d6c80f1
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
3 changes: 2 additions & 1 deletion pipelinewise/fastsync/commons/target_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def __init__(self, connection_config, transformation_config=None):

def open_connection(self):
project_id = self.connection_config['project_id']
return bigquery.Client(project=project_id)
location = self.connection_config.get('location', None)
return bigquery.Client(project=project_id, location=location)

def query(self, query, params=None):
def to_query_parameter(value):
Expand Down
23 changes: 12 additions & 11 deletions tests/units/fastsync/commons/test_fastsync_target_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,24 @@ class TestFastSyncTargetBigquery:
"""
def setup_method(self):
"""Initialise test FastSyncTargetPostgres object"""
self.bigquery = FastSyncTargetBigqueryMock(connection_config={'project_id': 'dummy-project'},
self.bigquery = FastSyncTargetBigqueryMock(connection_config={'project_id': 'dummy-project', 'location': 'EU'},
transformation_config={})

@patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client')
def test_create_schema(self, client, bigquery_job):
def test_open_connection(self, client):
"""Validate if create schema queries generated correctly"""
self.bigquery.open_connection()
client.assert_called_with(project='dummy-project', location='EU')

@patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client')
def test_create_schema(self, client):
"""Validate if create schema queries generated correctly"""
client().query.return_value = bigquery_job
self.bigquery.create_schema('new_schema')
client().create_dataset.assert_called_with('new_schema', exists_ok=True)

@patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client')
def test_drop_table(self, client, bigquery_job):
def test_drop_table(self, client):
"""Validate if drop table queries generated correctly"""
client().query.return_value = bigquery_job

self.bigquery.drop_table('test_schema', 'test_table')
client().query.assert_called_with(
'DROP TABLE IF EXISTS test_schema.`test_table`', job_config=ANY)
Expand All @@ -88,10 +91,8 @@ def test_drop_table(self, client, bigquery_job):
'DROP TABLE IF EXISTS test_schema.`test_table_with_space_temp`', job_config=ANY)

@patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client')
def test_create_table(self, client, bigquery_job):
def test_create_table(self, client):
"""Validate if create table queries generated correctly"""
client().query.return_value = bigquery_job

# Create table with standard table and column names
self.bigquery.create_table(target_schema='test_schema',
table_name='test_table',
Expand Down Expand Up @@ -232,15 +233,15 @@ def test_grant_select_on_table(self, client, bigquery_job):
'GRANT SELECT ON test_schema.`table_with_space_and_uppercase` TO ROLE test_role', job_config=ANY)

@patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client')
def test_grant_usage_on_schema(self, client, bigquery_job):
def test_grant_usage_on_schema(self, client):
"""Validate if GRANT command generated correctly"""
self.bigquery.grant_usage_on_schema(target_schema='test_schema',
role='test_role')
client().query.assert_called_with(
'GRANT USAGE ON SCHEMA test_schema TO ROLE test_role', job_config=ANY)

@patch('pipelinewise.fastsync.commons.target_bigquery.bigquery.Client')
def test_grant_select_on_schema(self, client, bigquery_job):
def test_grant_select_on_schema(self, client):
"""Validate if GRANT command generated correctly"""
self.bigquery.grant_select_on_schema(target_schema='test_schema',
role='test_role')
Expand Down

0 comments on commit d6c80f1

Please sign in to comment.