Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass credentials directly to GCP connectors rather than through environment variable #1040

Merged
merged 7 commits into from
Jul 12, 2024
44 changes: 23 additions & 21 deletions parsons/google/google_admin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from oauth2client.service_account import ServiceAccountCredentials
import uuid

from google.auth.transport.requests import AuthorizedSession

from parsons.etl.table import Table
from parsons.google.utitities import setup_google_application_credentials
import httplib2
import json
import os
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)


class GoogleAdmin(object):
Expand All @@ -23,17 +26,16 @@ class GoogleAdmin(object):
"""

def __init__(self, app_creds=None, sub=None):
setup_google_application_credentials(app_creds)

self.client = (
ServiceAccountCredentials.from_json_keyfile_name(
os.environ["GOOGLE_APPLICATION_CREDENTIALS"],
["https://www.googleapis.com/auth/admin.directory.group"],
)
.create_delegated(sub)
.authorize(httplib2.Http())
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(app_creds, target_env_var_name=env_credentials_path)
credentials = load_google_application_credentials(
env_credentials_path,
scopes=["https://www.googleapis.com/auth/admin.directory.group"],
subject=sub,
)

self.client = AuthorizedSession(credentials)

def _paginate_request(self, endpoint, collection, params=None):
# Build query params
param_arr = []
Expand All @@ -48,7 +50,9 @@ def _paginate_request(self, endpoint, collection, params=None):

# Return type from Google Admin is a tuple of length 2. Extract desired result from 2nd item
# in tuple and convert to json
res = json.loads(self.client.request(req_url + param_str, "GET")[1].decode("utf-8"))
res = self.client.request("GET", req_url + param_str).json()
if "error" in res:
raise RuntimeError(res["error"].get("message"))

# Paginate
ret = []
Expand All @@ -60,12 +64,10 @@ def _paginate_request(self, endpoint, collection, params=None):
param_arr.append("pageToken=" + res["nextPageToken"])
else:
param_arr[-1] = "pageToken=" + res["nextPageToken"]
res = json.loads(
self.client.request(req_url + "?" + "&".join(param_arr), "GET")[1].decode(
"utf-8"
)
)
ret += res[collection]
response = self.client.request("GET", req_url + "?" + "&".join(param_arr)).json()
if "error" in response:
raise RuntimeError(response["error"].get("message"))
ret += response[collection]

return Table(ret)

Expand Down
16 changes: 11 additions & 5 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
from parsons.databases.table import BaseTable
from parsons.etl import Table
from parsons.google.google_cloud_storage import GoogleCloudStorage
from parsons.google.utitities import setup_google_application_credentials
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file

Expand Down Expand Up @@ -160,8 +163,11 @@ def __init__(
if isinstance(app_creds, Credentials):
self.credentials = app_creds
else:
self.credentials = None
setup_google_application_credentials(app_creds)
self.env_credential_path = str(uuid.uuid4())
setup_google_application_credentials(
app_creds, target_env_var_name=self.env_credential_path
)
self.credentials = load_google_application_credentials(self.env_credential_path)

self.project = project
self.location = location
Expand Down Expand Up @@ -695,7 +701,7 @@ def copy_s3(

# copy from S3 to GCS
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
gcs_client = gcs_client or GoogleCloudStorage()
gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds)
temp_blob_uri = gcs_client.copy_s3_to_gcs(
aws_source_bucket=bucket,
aws_access_key_id=aws_access_key_id,
Expand Down Expand Up @@ -808,7 +814,7 @@ def copy(
schema.append(schema_row)
job_config.schema = schema

gcs_client = gcs_client or GoogleCloudStorage()
gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds)
temp_blob_name = f"{uuid.uuid4()}.{data_type}"
temp_blob_uri = gcs_client.upload_table(tbl, tmp_gcs_bucket, temp_blob_name)

Expand Down
52 changes: 37 additions & 15 deletions parsons/google/google_cloud_storage.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
import google
from google.cloud import storage
from google.cloud import storage_transfer
from parsons.google.utitities import setup_google_application_credentials
from parsons.utilities import files
import datetime
import gzip
import petl
Expand All @@ -12,6 +7,15 @@
import zipfile
from typing import Optional

import google
from google.cloud import storage, storage_transfer

from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)
from parsons.utilities import files

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -42,17 +46,21 @@ class GoogleCloudStorage(object):
"""

def __init__(self, app_creds=None, project=None):
setup_google_application_credentials(app_creds)
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(
app_creds, target_env_var_name=env_credentials_path
)
credentials = load_google_application_credentials(env_credentials_path)
self.project = project

# Throws an error if you pass project=None, so adding if/else statement.
if not self.project:
self.client = storage.Client()
self.client = storage.Client(credentials=credentials)
"""
Access all methods of `google.cloud` package
"""
else:
self.client = storage.Client(project=self.project)
self.client = storage.Client(credentials=credentials, project=self.project)

def list_buckets(self):
"""
Expand Down Expand Up @@ -289,7 +297,9 @@ def delete_blob(self, bucket_name, blob_name):
blob.delete()
logger.info(f"{blob_name} blob in {bucket_name} bucket deleted.")

def upload_table(self, table, bucket_name, blob_name, data_type="csv", default_acl=None):
def upload_table(
self, table, bucket_name, blob_name, data_type="csv", default_acl=None
):
"""
Load the data from a Parsons table into a blob.

Expand Down Expand Up @@ -325,7 +335,9 @@ def upload_table(self, table, bucket_name, blob_name, data_type="csv", default_a
local_file = table.to_json()
content_type = "application/json"
else:
raise ValueError(f"Unknown data_type value ({data_type}): must be one of: csv or json")
raise ValueError(
f"Unknown data_type value ({data_type}): must be one of: csv or json"
)

try:
blob.upload_from_filename(
Expand Down Expand Up @@ -395,7 +407,9 @@ def copy_bucket_to_gcs(
Secret key to authenticate storage transfer
"""
if source not in ["gcs", "s3"]:
raise ValueError(f"Blob transfer only supports gcs and s3 sources [source={source}]")
raise ValueError(
f"Blob transfer only supports gcs and s3 sources [source={source}]"
)
if source_path and source_path[-1] != "/":
raise ValueError("Source path much end in a '/'")

Expand Down Expand Up @@ -582,9 +596,13 @@ def unzip_blob(
}

file_extension = compression_params[compression_type]["file_extension"]
compression_function = compression_params[compression_type]["compression_function"]
compression_function = compression_params[compression_type][
"compression_function"
]

compressed_filepath = self.download_blob(bucket_name=bucket_name, blob_name=blob_name)
compressed_filepath = self.download_blob(
bucket_name=bucket_name, blob_name=blob_name
)

decompressed_filepath = compressed_filepath.replace(file_extension, "")
decompressed_blob_name = (
Expand Down Expand Up @@ -616,7 +634,9 @@ def __gzip_decompress_and_write_to_gcs(self, **kwargs):
bucket_name = kwargs.pop("bucket_name")

with gzip.open(compressed_filepath, "rb") as f_in:
logger.debug(f"Uploading uncompressed file to GCS: {decompressed_blob_name}")
logger.debug(
f"Uploading uncompressed file to GCS: {decompressed_blob_name}"
)
bucket = self.get_bucket(bucket_name=bucket_name)
blob = storage.Blob(name=decompressed_blob_name, bucket=bucket)
blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600)
Expand All @@ -636,7 +656,9 @@ def __zip_decompress_and_write_to_gcs(self, **kwargs):
with zipfile.ZipFile(compressed_filepath) as path_:
# Open the underlying file
with path_.open(decompressed_blob_in_archive) as f_in:
logger.debug(f"Uploading uncompressed file to GCS: {decompressed_blob_name}")
logger.debug(
f"Uploading uncompressed file to GCS: {decompressed_blob_name}"
)
bucket = self.get_bucket(bucket_name=bucket_name)
blob = storage.Blob(name=decompressed_blob_name, bucket=bucket)
blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600)
39 changes: 25 additions & 14 deletions parsons/google/google_sheets.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
import json
import logging

from parsons.etl.table import Table
from parsons.google.utitities import setup_google_application_credentials, hexavigesimal
import uuid

import gspread
from google.oauth2.service_account import Credentials

from parsons.etl.table import Table
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
hexavigesimal,
)

logger = logging.getLogger(__name__)

Expand All @@ -32,12 +34,15 @@ def __init__(self, google_keyfile_dict=None, subject=None):
"https://www.googleapis.com/auth/drive",
]

setup_google_application_credentials(google_keyfile_dict, "GOOGLE_DRIVE_CREDENTIALS")
google_credential_file = open(os.environ["GOOGLE_DRIVE_CREDENTIALS"])
credentials_dict = json.load(google_credential_file)
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(
google_keyfile_dict,
"GOOGLE_DRIVE_CREDENTIAL",
target_env_var_name=env_credentials_path,
)

credentials = Credentials.from_service_account_info(
credentials_dict, scopes=scope, subject=subject
credentials = load_google_application_credentials(
env_credentials_path, scopes=scope, subject=subject
)

self.gspread_client = gspread.authorize(credentials)
Expand All @@ -47,12 +52,16 @@ def _get_worksheet(self, spreadsheet_id, worksheet=0):

# Check if the worksheet is an integer, if so find the sheet by index
if isinstance(worksheet, int):
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(worksheet)
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(
worksheet
)

elif isinstance(worksheet, str):
idx = self.list_worksheets(spreadsheet_id).index(worksheet)
try:
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(idx)
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(
idx
)
except: # noqa: E722
raise ValueError(f"Couldn't find worksheet {worksheet}")

Expand Down Expand Up @@ -280,7 +289,9 @@ def append_to_sheet(

# If the existing sheet is blank, then just overwrite the table.
if existing_table.num_rows == 0:
return self.overwrite_sheet(spreadsheet_id, table, worksheet, user_entered_value)
return self.overwrite_sheet(
spreadsheet_id, table, worksheet, user_entered_value
)

cells = []
for row_num, row in enumerate(table.data):
Expand Down
34 changes: 30 additions & 4 deletions parsons/google/utitities.py → parsons/google/utilities.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import typing as t
from parsons.utilities import files
from parsons.utilities import check_env
import json
import os
import typing as t

import google
from google.oauth2 import service_account

from parsons.utilities import check_env, files


def setup_google_application_credentials(
app_creds: t.Union[t.Dict, str, None],
env_var_name: str = "GOOGLE_APPLICATION_CREDENTIALS",
target_env_var_name: t.Optional[str] = None,
) -> None:
# Detect if app_creds is a dict, path string or json string, and if it is a
# json string, then convert it to a temporary file. Then set the
Expand All @@ -21,7 +25,10 @@ def setup_google_application_credentials(
except ValueError:
creds_path = credentials

os.environ[env_var_name] = creds_path
if not target_env_var_name:
target_env_var_name = env_var_name

os.environ[target_env_var_name] = creds_path


def hexavigesimal(n: int) -> str:
Expand All @@ -48,3 +55,22 @@ def hexavigesimal(n: int) -> str:
chars = chr((n - 1) % 26 + 65) + chars # 65 makes us start at A
n = (n - 1) // 26
return chars


def load_google_application_credentials(
env_var_name: str = "GOOGLE_APPLICATION_CREDENTIALS",
scopes: t.Optional[t.List[str]] = None,
subject: t.Optional[str] = None,
) -> google.auth.credentials.Credentials:

service_account_filepath = os.environ[env_var_name]

credentials = service_account.Credentials.from_service_account_file(service_account_filepath)

if scopes:
credentials = credentials.with_scopes(scopes)

if subject:
credentials = credentials.with_subject(subject)

return credentials
Loading
Loading