diff --git a/parsons/google/google_admin.py b/parsons/google/google_admin.py index cf5c730929..b69cd5b3d9 100644 --- a/parsons/google/google_admin.py +++ b/parsons/google/google_admin.py @@ -1,9 +1,12 @@ -from oauth2client.service_account import ServiceAccountCredentials +import uuid + +from google.auth.transport.requests import AuthorizedSession + from parsons.etl.table import Table -from parsons.google.utitities import setup_google_application_credentials -import httplib2 -import json -import os +from parsons.google.utilities import ( + load_google_application_credentials, + setup_google_application_credentials, +) class GoogleAdmin(object): @@ -23,17 +26,16 @@ class GoogleAdmin(object): """ def __init__(self, app_creds=None, sub=None): - setup_google_application_credentials(app_creds) - - self.client = ( - ServiceAccountCredentials.from_json_keyfile_name( - os.environ["GOOGLE_APPLICATION_CREDENTIALS"], - ["https://www.googleapis.com/auth/admin.directory.group"], - ) - .create_delegated(sub) - .authorize(httplib2.Http()) + env_credentials_path = str(uuid.uuid4()) + setup_google_application_credentials(app_creds, target_env_var_name=env_credentials_path) + credentials = load_google_application_credentials( + env_credentials_path, + scopes=["https://www.googleapis.com/auth/admin.directory.group"], + subject=sub, ) + self.client = AuthorizedSession(credentials) + def _paginate_request(self, endpoint, collection, params=None): # Build query params param_arr = [] @@ -48,7 +50,9 @@ def _paginate_request(self, endpoint, collection, params=None): # Return type from Google Admin is a tuple of length 2. Extract desired result from 2nd item # in tuple and convert to json - res = json.loads(self.client.request(req_url + param_str, "GET")[1].decode("utf-8")) + res = self.client.request("GET", req_url + param_str).json() + if "error" in res: + raise RuntimeError(res["error"].get("message")) # Paginate ret = [] @@ -60,12 +64,10 @@ def _paginate_request(self, endpoint, collection, params=None): param_arr.append("pageToken=" + res["nextPageToken"]) else: param_arr[-1] = "pageToken=" + res["nextPageToken"] - res = json.loads( - self.client.request(req_url + "?" + "&".join(param_arr), "GET")[1].decode( - "utf-8" - ) - ) - ret += res[collection] + response = self.client.request("GET", req_url + "?" + "&".join(param_arr)).json() + if "error" in response: + raise RuntimeError(response["error"].get("message")) + ret += response[collection] return Table(ret) diff --git a/parsons/google/google_bigquery.py b/parsons/google/google_bigquery.py index c606242755..5d52bf091a 100644 --- a/parsons/google/google_bigquery.py +++ b/parsons/google/google_bigquery.py @@ -17,7 +17,10 @@ from parsons.databases.table import BaseTable from parsons.etl import Table from parsons.google.google_cloud_storage import GoogleCloudStorage -from parsons.google.utitities import setup_google_application_credentials +from parsons.google.utilities import ( + load_google_application_credentials, + setup_google_application_credentials, +) from parsons.utilities import check_env from parsons.utilities.files import create_temp_file @@ -160,8 +163,11 @@ def __init__( if isinstance(app_creds, Credentials): self.credentials = app_creds else: - self.credentials = None - setup_google_application_credentials(app_creds) + self.env_credential_path = str(uuid.uuid4()) + setup_google_application_credentials( + app_creds, target_env_var_name=self.env_credential_path + ) + self.credentials = load_google_application_credentials(self.env_credential_path) self.project = project self.location = location @@ -695,7 +701,7 @@ def copy_s3( # copy from S3 to GCS tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket) - gcs_client = gcs_client or GoogleCloudStorage() + gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds) temp_blob_uri = gcs_client.copy_s3_to_gcs( aws_source_bucket=bucket, aws_access_key_id=aws_access_key_id, @@ -808,7 +814,7 @@ def copy( schema.append(schema_row) job_config.schema = schema - gcs_client = gcs_client or GoogleCloudStorage() + gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds) temp_blob_name = f"{uuid.uuid4()}.{data_type}" temp_blob_uri = gcs_client.upload_table(tbl, tmp_gcs_bucket, temp_blob_name) diff --git a/parsons/google/google_cloud_storage.py b/parsons/google/google_cloud_storage.py index e88b0fe0b0..bb2462614a 100644 --- a/parsons/google/google_cloud_storage.py +++ b/parsons/google/google_cloud_storage.py @@ -1,8 +1,3 @@ -import google -from google.cloud import storage -from google.cloud import storage_transfer -from parsons.google.utitities import setup_google_application_credentials -from parsons.utilities import files import datetime import gzip import petl @@ -12,6 +7,15 @@ import zipfile from typing import Optional +import google +from google.cloud import storage, storage_transfer + +from parsons.google.utilities import ( + load_google_application_credentials, + setup_google_application_credentials, +) +from parsons.utilities import files + logger = logging.getLogger(__name__) @@ -42,17 +46,21 @@ class GoogleCloudStorage(object): """ def __init__(self, app_creds=None, project=None): - setup_google_application_credentials(app_creds) + env_credentials_path = str(uuid.uuid4()) + setup_google_application_credentials( + app_creds, target_env_var_name=env_credentials_path + ) + credentials = load_google_application_credentials(env_credentials_path) self.project = project # Throws an error if you pass project=None, so adding if/else statement. if not self.project: - self.client = storage.Client() + self.client = storage.Client(credentials=credentials) """ Access all methods of `google.cloud` package """ else: - self.client = storage.Client(project=self.project) + self.client = storage.Client(credentials=credentials, project=self.project) def list_buckets(self): """ @@ -289,7 +297,9 @@ def delete_blob(self, bucket_name, blob_name): blob.delete() logger.info(f"{blob_name} blob in {bucket_name} bucket deleted.") - def upload_table(self, table, bucket_name, blob_name, data_type="csv", default_acl=None): + def upload_table( + self, table, bucket_name, blob_name, data_type="csv", default_acl=None + ): """ Load the data from a Parsons table into a blob. @@ -325,7 +335,9 @@ def upload_table(self, table, bucket_name, blob_name, data_type="csv", default_a local_file = table.to_json() content_type = "application/json" else: - raise ValueError(f"Unknown data_type value ({data_type}): must be one of: csv or json") + raise ValueError( + f"Unknown data_type value ({data_type}): must be one of: csv or json" + ) try: blob.upload_from_filename( @@ -395,7 +407,9 @@ def copy_bucket_to_gcs( Secret key to authenticate storage transfer """ if source not in ["gcs", "s3"]: - raise ValueError(f"Blob transfer only supports gcs and s3 sources [source={source}]") + raise ValueError( + f"Blob transfer only supports gcs and s3 sources [source={source}]" + ) if source_path and source_path[-1] != "/": raise ValueError("Source path much end in a '/'") @@ -582,9 +596,13 @@ def unzip_blob( } file_extension = compression_params[compression_type]["file_extension"] - compression_function = compression_params[compression_type]["compression_function"] + compression_function = compression_params[compression_type][ + "compression_function" + ] - compressed_filepath = self.download_blob(bucket_name=bucket_name, blob_name=blob_name) + compressed_filepath = self.download_blob( + bucket_name=bucket_name, blob_name=blob_name + ) decompressed_filepath = compressed_filepath.replace(file_extension, "") decompressed_blob_name = ( @@ -616,7 +634,9 @@ def __gzip_decompress_and_write_to_gcs(self, **kwargs): bucket_name = kwargs.pop("bucket_name") with gzip.open(compressed_filepath, "rb") as f_in: - logger.debug(f"Uploading uncompressed file to GCS: {decompressed_blob_name}") + logger.debug( + f"Uploading uncompressed file to GCS: {decompressed_blob_name}" + ) bucket = self.get_bucket(bucket_name=bucket_name) blob = storage.Blob(name=decompressed_blob_name, bucket=bucket) blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600) @@ -636,7 +656,9 @@ def __zip_decompress_and_write_to_gcs(self, **kwargs): with zipfile.ZipFile(compressed_filepath) as path_: # Open the underlying file with path_.open(decompressed_blob_in_archive) as f_in: - logger.debug(f"Uploading uncompressed file to GCS: {decompressed_blob_name}") + logger.debug( + f"Uploading uncompressed file to GCS: {decompressed_blob_name}" + ) bucket = self.get_bucket(bucket_name=bucket_name) blob = storage.Blob(name=decompressed_blob_name, bucket=bucket) blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600) diff --git a/parsons/google/google_sheets.py b/parsons/google/google_sheets.py index 64fe25d75e..495d7cf759 100644 --- a/parsons/google/google_sheets.py +++ b/parsons/google/google_sheets.py @@ -1,12 +1,14 @@ -import os -import json import logging - -from parsons.etl.table import Table -from parsons.google.utitities import setup_google_application_credentials, hexavigesimal +import uuid import gspread -from google.oauth2.service_account import Credentials + +from parsons.etl.table import Table +from parsons.google.utilities import ( + load_google_application_credentials, + setup_google_application_credentials, + hexavigesimal, +) logger = logging.getLogger(__name__) @@ -32,12 +34,15 @@ def __init__(self, google_keyfile_dict=None, subject=None): "https://www.googleapis.com/auth/drive", ] - setup_google_application_credentials(google_keyfile_dict, "GOOGLE_DRIVE_CREDENTIALS") - google_credential_file = open(os.environ["GOOGLE_DRIVE_CREDENTIALS"]) - credentials_dict = json.load(google_credential_file) + env_credentials_path = str(uuid.uuid4()) + setup_google_application_credentials( + google_keyfile_dict, + "GOOGLE_DRIVE_CREDENTIAL", + target_env_var_name=env_credentials_path, + ) - credentials = Credentials.from_service_account_info( - credentials_dict, scopes=scope, subject=subject + credentials = load_google_application_credentials( + env_credentials_path, scopes=scope, subject=subject ) self.gspread_client = gspread.authorize(credentials) @@ -47,12 +52,16 @@ def _get_worksheet(self, spreadsheet_id, worksheet=0): # Check if the worksheet is an integer, if so find the sheet by index if isinstance(worksheet, int): - return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(worksheet) + return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet( + worksheet + ) elif isinstance(worksheet, str): idx = self.list_worksheets(spreadsheet_id).index(worksheet) try: - return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(idx) + return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet( + idx + ) except: # noqa: E722 raise ValueError(f"Couldn't find worksheet {worksheet}") @@ -280,7 +289,9 @@ def append_to_sheet( # If the existing sheet is blank, then just overwrite the table. if existing_table.num_rows == 0: - return self.overwrite_sheet(spreadsheet_id, table, worksheet, user_entered_value) + return self.overwrite_sheet( + spreadsheet_id, table, worksheet, user_entered_value + ) cells = [] for row_num, row in enumerate(table.data): diff --git a/parsons/google/utitities.py b/parsons/google/utilities.py similarity index 62% rename from parsons/google/utitities.py rename to parsons/google/utilities.py index 75cdf01de0..ca548ea9f3 100644 --- a/parsons/google/utitities.py +++ b/parsons/google/utilities.py @@ -1,13 +1,17 @@ -import typing as t -from parsons.utilities import files -from parsons.utilities import check_env import json import os +import typing as t + +import google +from google.oauth2 import service_account + +from parsons.utilities import check_env, files def setup_google_application_credentials( app_creds: t.Union[t.Dict, str, None], env_var_name: str = "GOOGLE_APPLICATION_CREDENTIALS", + target_env_var_name: t.Optional[str] = None, ) -> None: # Detect if app_creds is a dict, path string or json string, and if it is a # json string, then convert it to a temporary file. Then set the @@ -21,7 +25,10 @@ def setup_google_application_credentials( except ValueError: creds_path = credentials - os.environ[env_var_name] = creds_path + if not target_env_var_name: + target_env_var_name = env_var_name + + os.environ[target_env_var_name] = creds_path def hexavigesimal(n: int) -> str: @@ -48,3 +55,22 @@ def hexavigesimal(n: int) -> str: chars = chr((n - 1) % 26 + 65) + chars # 65 makes us start at A n = (n - 1) // 26 return chars + + +def load_google_application_credentials( + env_var_name: str = "GOOGLE_APPLICATION_CREDENTIALS", + scopes: t.Optional[t.List[str]] = None, + subject: t.Optional[str] = None, +) -> google.auth.credentials.Credentials: + + service_account_filepath = os.environ[env_var_name] + + credentials = service_account.Credentials.from_service_account_file(service_account_filepath) + + if scopes: + credentials = credentials.with_scopes(scopes) + + if subject: + credentials = credentials.with_subject(subject) + + return credentials diff --git a/test/test_databases/test_bigquery.py b/test/test_databases/test_bigquery.py index 080bc709a9..05e7671ceb 100644 --- a/test/test_databases/test_bigquery.py +++ b/test/test_databases/test_bigquery.py @@ -2,18 +2,27 @@ import os import unittest.mock as mock from test.test_google.test_utilities import FakeCredentialTest +from typing import Union from google.cloud import bigquery, exceptions -from parsons import GoogleBigQuery as BigQuery +from parsons import GoogleBigQuery from parsons import Table from parsons.google.google_cloud_storage import GoogleCloudStorage +class BigQuery(GoogleBigQuery): + @mock.patch("parsons.google.google_bigquery.load_google_application_credentials") + def __init__(self, load_creds_mock, app_creds=None, **kwargs): + super().__init__(app_creds=app_creds, **kwargs) + + class FakeClient: """A Fake Storage Client used for monkey-patching.""" - def __init__(self, project=None): + @mock.patch("parsons.google.google_bigquery.load_google_application_credentials") + @mock.patch("parsons.google.google_cloud_storage.load_google_application_credentials") + def __init__(self, load_creds_mock, load_creds_mock_2, project=None, credentials=None): self.project = project @@ -21,7 +30,8 @@ class FakeGoogleCloudStorage(GoogleCloudStorage): """A Fake GoogleCloudStorage object used to test setting up credentials.""" @mock.patch("google.cloud.storage.Client", FakeClient) - def __init__(self): + @mock.patch("parsons.google.google_cloud_storage.load_google_application_credentials") + def __init__(self, load_creds_mock): super().__init__(None, None) def upload_table(self, table, bucket_name, blob_name, data_type="csv", default_acl=None): @@ -238,6 +248,7 @@ def test_copy_gcs__bad_if_exists(self): ) @mock.patch("google.cloud.storage.Client") + @mock.patch("parsons.google.google_cloud_storage.load_google_application_credentials") @mock.patch.object(GoogleCloudStorage, "split_uri", return_value=("tmp", "file.gzip")) @mock.patch.object(GoogleCloudStorage, "unzip_blob", return_value="gs://tmp/file.csv") def test_copy_large_compressed_file_from_gcs( @@ -355,9 +366,13 @@ def test_copy(self): self.assertEqual(delete_call_args[0][0], self.tmp_gcs_bucket) self.assertEqual(delete_call_args[0][1], tmp_blob_name) - def test_copy__credentials_are_correctly_set(self): + @mock.patch("parsons.google.google_cloud_storage.load_google_application_credentials") + @mock.patch("parsons.google.google_bigquery.load_google_application_credentials") + def test_copy__credentials_are_correctly_set__from_filepath( + self, load_creds_mock, load_creds_mock_2 + ): tbl = self.default_table - bq = self._build_mock_client_for_copying(table_exists=False) + bq = self._build_mock_client_for_copying(table_exists=False, app_creds=self.cred_path) # Pass in our fake GCS Client. bq.copy( @@ -367,7 +382,61 @@ def test_copy__credentials_are_correctly_set(self): gcs_client=FakeGoogleCloudStorage(), ) - actual = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + actual = os.environ[bq.env_credential_path] + + with open(actual, "r") as factual: + with open(self.cred_path, "r") as fexpected: + actual_str = factual.read() + self.assertEqual(actual_str, fexpected.read()) + self.assertEqual(self.cred_contents, json.loads(actual_str)) + + @mock.patch("parsons.google.google_cloud_storage.load_google_application_credentials") + @mock.patch("parsons.google.google_bigquery.load_google_application_credentials") + def test_copy__credentials_are_correctly_set__from_env( + self, load_creds_mock, load_creds_mock_2 + ): + tbl = self.default_table + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.cred_path + + bq = self._build_mock_client_for_copying( + table_exists=False, + ) + + # Pass in our fake GCS Client. + bq.copy( + tbl, + "dataset.table", + tmp_gcs_bucket=self.tmp_gcs_bucket, + gcs_client=FakeGoogleCloudStorage(), + ) + + actual = os.environ[bq.env_credential_path] + + with open(actual, "r") as factual: + with open(self.cred_path, "r") as fexpected: + actual_str = factual.read() + self.assertEqual(actual_str, fexpected.read()) + self.assertEqual(self.cred_contents, json.loads(actual_str)) + + @mock.patch("parsons.google.google_cloud_storage.load_google_application_credentials") + @mock.patch("parsons.google.google_bigquery.load_google_application_credentials") + def test_copy__credentials_are_correctly_set__from_dict( + self, load_creds_mock, load_creds_mock_2 + ): + tbl = self.default_table + with open(self.cred_path) as file: + cred_dict = json.loads(file.read()) + bq = self._build_mock_client_for_copying(table_exists=False, app_creds=cred_dict) + + # Pass in our fake GCS Client. + bq.copy( + tbl, + "dataset.table", + tmp_gcs_bucket=self.tmp_gcs_bucket, + gcs_client=FakeGoogleCloudStorage(), + ) + + actual = os.environ[bq.env_credential_path] with open(actual, "r") as factual: with open(self.cred_path, "r") as fexpected: @@ -511,11 +580,13 @@ def _build_mock_client_for_querying(self, results): bq._dbapi = dbapi return bq - def _build_mock_client_for_copying(self, table_exists=True): + def _build_mock_client_for_copying( + self, table_exists=True, app_creds: Union[str, dict, None] = None + ): bq_client = mock.MagicMock() if not table_exists: bq_client.get_table.side_effect = exceptions.NotFound("not found") - bq = BigQuery() + bq = BigQuery(app_creds=app_creds) bq._client = bq_client return bq diff --git a/test/test_google/test_google_admin.py b/test/test_google/test_google_admin.py index 3f746f635f..4951892a67 100644 --- a/test/test_google/test_google_admin.py +++ b/test/test_google/test_google_admin.py @@ -31,36 +31,41 @@ def setUp(self): self.google_admin = MockGoogleAdmin() def test_aliases(self): - self.google_admin.client.request = MagicMock( - return_value=( - "", - '{"aliases": [{"alias": "fakeemail7@fakedomain.com"},' - '{"alias": "fakeemail8@fakedomain' - '.com"}]}'.encode(), - ) - ) + response_mock = MagicMock() + self.google_admin.client.request = MagicMock(return_value=response_mock) + response_mock.json.return_value = { + "aliases": [ + {"alias": "fakeemail7@fakedomain.com"}, + {"alias": "fakeemail8@fakedomain.com"}, + ] + } assert_matching_tables(self.google_admin.get_aliases("1"), self.mock_aliases) def test_all_group_members(self): - self.google_admin.client.request = MagicMock( - return_value=( - "", - '{"members": [{"email": "fakeemail4@fakedomain.com"}]}'.encode(), - ) - ) + response_mock = MagicMock() + self.google_admin.client.request = MagicMock(return_value=response_mock) + response_mock.json.return_value = {"members": [{"email": "fakeemail4@fakedomain.com"}]} assert_matching_tables( self.google_admin.get_all_group_members("1"), self.mock_all_group_members ) def test_all_groups(self): - self.google_admin.client.request = MagicMock( - return_value=( - "", - '{"groups": [{"aliases": ["fakeemail7@fakedomain.com", "fakeemail8@fakedomain.com"], "e' # noqa: E501 - 'mail": "fakeemail4@fakedomain.com", "id": 1}, {"email": "fakeemail5@fakedomain.com", "' # noqa: E501 - 'id": 2}, {"email": "fakeemail6@fakedomain.com", "id": 3}]}'.encode(), - ) - ) + response_mock = MagicMock() + self.google_admin.client.request = MagicMock(return_value=response_mock) + response_mock.json.return_value = { + "groups": [ + { + "aliases": [ + "fakeemail7@fakedomain.com", + "fakeemail8@fakedomain.com", + ], + "email": "fakeemail4@fakedomain.com", + "id": 1, + }, + {"email": "fakeemail5@fakedomain.com", "id": 2}, + {"email": "fakeemail6@fakedomain.com", "id": 3}, + ] + } assert_matching_tables( self.google_admin.get_all_groups({"domain": "fakedomain.com"}), self.mock_all_groups, diff --git a/test/test_google/test_utilities.py b/test/test_google/test_utilities.py index accb506f90..24c8a7bc47 100644 --- a/test/test_google/test_utilities.py +++ b/test/test_google/test_utilities.py @@ -1,9 +1,9 @@ import json -import unittest import os import tempfile +import unittest -from parsons.google import utitities as util +from parsons.google import utilities as util class FakeCredentialTest(unittest.TestCase):