Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding helper functions for DB operations #24

Merged
merged 4 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ boto3 = "==1.34.117"
minio = "==7.2.7"
delta-spark = "==3.2.0" # should match JAR version (DELTA_SPARK_VER) specified in the Dockerfile
pandas = "==2.2.2"
pyarrow = "==16.1.0"

[dev-packages]
pytest = "==8.2.1"
Expand Down
68 changes: 55 additions & 13 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 71 additions & 0 deletions src/db_ops/spark_db_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
This module contains utility functions to interact with the Spark catalog.

"""

from pyspark.sql import SparkSession


def create_namespace_if_not_exists(
spark: SparkSession,
namespace: str = "default"
) -> None:

"""
Create a namespace in the Spark catalog if it does not exist.

:param spark: The Spark session.
:param namespace: The name of the namespace. Default is "default".
:return: None
"""

spark.sql(f"CREATE DATABASE IF NOT EXISTS {namespace}")
print(f"Namespace {namespace} is ready to use.")

Check warning on line 23 in src/db_ops/spark_db_utils.py

View check run for this annotation

Codecov / codecov/patch

src/db_ops/spark_db_utils.py#L22-L23

Added lines #L22 - L23 were not covered by tests


def table_exists(
spark: SparkSession,
table_name: str,
namespace: str = "default",
) -> bool:

"""
Check if a table exists in the Spark catalog.

:param spark: The Spark session.
:param table_name: The name of the table.
:param namespace: The namespace of the table. Default is "default".
:return: True if the table exists, False otherwise.
"""

spark_catalog = f"{namespace}.{table_name}"

Check warning on line 41 in src/db_ops/spark_db_utils.py

View check run for this annotation

Codecov / codecov/patch

src/db_ops/spark_db_utils.py#L41

Added line #L41 was not covered by tests

try:
spark.table(spark_catalog)
print(f"Table {spark_catalog} exists.")
return True
except Exception as e:
print(f"Table {spark_catalog} does not exist: {e}")
return False

Check warning on line 49 in src/db_ops/spark_db_utils.py

View check run for this annotation

Codecov / codecov/patch

src/db_ops/spark_db_utils.py#L43-L49

Added lines #L43 - L49 were not covered by tests


def remove_table(
spark: SparkSession,
table_name: str,
namespace: str = "default",
) -> None:

"""
Remove a table from the Spark catalog.

:param spark: The Spark session.
:param table_name: The name of the table.
:param namespace: The namespace of the table. Default is "default".
:return: None
"""

spark_catalog = f"{namespace}.{table_name}"

Check warning on line 67 in src/db_ops/spark_db_utils.py

View check run for this annotation

Codecov / codecov/patch

src/db_ops/spark_db_utils.py#L67

Added line #L67 was not covered by tests

spark.sql(f"DROP TABLE IF EXISTS {spark_catalog}")
print(f"Table {spark_catalog} removed.")

Check warning on line 70 in src/db_ops/spark_db_utils.py

View check run for this annotation

Codecov / codecov/patch

src/db_ops/spark_db_utils.py#L69-L70

Added lines #L69 - L70 were not covered by tests

1 change: 1 addition & 0 deletions src/notebook/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
"""

from spark.utils import get_spark_session
from db_ops.spark_db_utils import create_namespace_if_not_exists, table_exists, remove_table
1 change: 1 addition & 0 deletions test/src/db_ops/spark_db_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from src.db_ops.spark_db_utils import *
Loading