Skip to content

Commit

Permalink
adding helper functions for DB operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Tianhao-Gu committed Jun 6, 2024
1 parent 33c02dc commit 9100a10
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 13 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ boto3 = "==1.34.117"
minio = "==7.2.7"
delta-spark = "==3.2.0" # should match JAR version (DELTA_SPARK_VER) specified in the Dockerfile
pandas = "==2.2.2"
pyarrow = "==16.1.0"

[dev-packages]
pytest = "==8.2.1"
Expand Down
68 changes: 55 additions & 13 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 76 additions & 0 deletions src/db_ops/spark_db_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
This module contains utility functions to interact with the Spark catalog.
"""

from pyspark.sql import SparkSession


def create_namespace_if_not_exists(
spark: SparkSession,
namespace: str = "default"
) -> None:

"""
Create a namespace in the Spark catalog if it does not exist.
:param spark: The Spark session.
:param namespace: The name of the namespace. Default is "default".
:return: None
"""
try:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {namespace}")
print(f"Namespace {namespace} is ready to use.")
except Exception as e:
print(f"Error creating namespace {namespace}: {e}")


def table_exists(
spark: SparkSession,
table_name: str,
namespace: str = "default",
) -> bool:

"""
Check if a table exists in the Spark catalog.
:param spark: The Spark session.
:param table_name: The name of the table.
:param namespace: The namespace of the table. Default is "default".
:return: True if the table exists, False otherwise.
"""

spark_catalog = f"{namespace}.{table_name}"

try:
spark.table(spark_catalog)
print(f"Table {spark_catalog} exists.")
return True
except Exception as e:
print(f"Table {spark_catalog} does not exist: {e}")
return False


def remove_table(
spark: SparkSession,
table_name: str,
namespace: str = "default",
) -> None:

"""
Remove a table from the Spark catalog.
:param spark: The Spark session.
:param table_name: The name of the table.
:param namespace: The namespace of the table. Default is "default".
:return: None
"""

spark_catalog = f"{namespace}.{table_name}"

try:
spark.sql(f"DROP TABLE IF EXISTS {spark_catalog}")
print(f"Table {spark_catalog} removed.")
except Exception as e:
print(f"Error removing table {spark_catalog}: {e}")

1 change: 1 addition & 0 deletions test/src/db_ops/spark_db_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from db_ops.spark_db_utils import *

0 comments on commit 9100a10

Please sign in to comment.