From 93c406bc2c091217d960cd590fe489dd06646853 Mon Sep 17 00:00:00 2001 From: Tianhao-Gu Date: Tue, 27 Aug 2024 12:31:29 -0500 Subject: [PATCH] add optional arg for minio_client --- src/minio_utils/minio_utils.py | 23 ++++++++++++++++++----- src/spark/utils.py | 8 +++++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/minio_utils/minio_utils.py b/src/minio_utils/minio_utils.py index 142c8f4..d371667 100644 --- a/src/minio_utils/minio_utils.py +++ b/src/minio_utils/minio_utils.py @@ -7,15 +7,28 @@ from minio import Minio -def get_minio_client() -> Minio: +def get_minio_client( + minio_url: str = None, + access_key: str = None, + secret_key: str = None, + secure: bool = False +) -> Minio: """ Helper function to get the Minio client. + :param minio_url: URL for the Minio server (environment variable used if not provided) + :param access_key: Access key for Minio (environment variable used if not provided) + :param secret_key: Secret key for Minio (environment variable used if not provided) + :param secure: Whether to use HTTPS (optional, default is False) :return: A Minio client object """ + minio_url = minio_url or os.environ['MINIO_URL'].replace("http://", "") + access_key = access_key or os.environ['MINIO_ACCESS_KEY'] + secret_key = secret_key or os.environ['MINIO_SECRET_KEY'] + return Minio( - os.environ['MINIO_URL'].replace("http://", ""), - access_key=os.environ['MINIO_ACCESS_KEY'], - secret_key=os.environ['MINIO_SECRET_KEY'], - secure=False + minio_url, + access_key=access_key, + secret_key=secret_key, + secure=secure ) diff --git a/src/spark/utils.py b/src/spark/utils.py index 595a8b3..0e78212 100644 --- a/src/spark/utils.py +++ b/src/spark/utils.py @@ -170,6 +170,9 @@ def read_csv( path: str, header: bool = True, sep: str = None, + minio_url: str = None, + access_key: str = None, + secret_key: str = None, **kwargs ) -> DataFrame: """ @@ -179,13 +182,16 @@ def read_csv( :param path: The minIO path to the CSV file. e.g. s3a://bucket-name/file.csv or bucket-name/file.csv :param header: Whether the CSV file has a header. Default is True. :param sep: The delimiter to use. If not provided, the function will try to detect it. + :param minio_url: The minIO URL. Default is None (environment variable used if not provided). + :param access_key: The minIO access key. Default is None (environment variable used if not provided). + :param secret_key: The minIO secret key. Default is None (environment variable used if not provided). :param kwargs: Additional arguments to pass to spark.read.csv. :return: A DataFrame. """ if not sep: - client = get_minio_client() + client = get_minio_client(minio_url=minio_url, access_key=access_key, secret_key=secret_key) bucket, key = path.replace("s3a://", "").split("/", 1) obj = client.get_object(bucket, key) sample = obj.read(8192).decode()