-
Notifications
You must be signed in to change notification settings - Fork 235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support for pyspark connection method #308
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
|
||
from __future__ import annotations | ||
|
||
import datetime as dt | ||
from types import TracebackType | ||
from typing import Any | ||
|
||
from dbt.events import AdapterLogger | ||
from dbt.utils import DECIMALS | ||
|
||
|
||
from pyspark.rdd import _load_from_socket | ||
import pyspark.sql.functions as F | ||
|
||
|
||
import importlib | ||
import sqlalchemy | ||
import re | ||
|
||
logger = AdapterLogger("Spark") | ||
NUMBERS = DECIMALS + (int, float) | ||
|
||
|
||
class PysparkConnectionWrapper(object): | ||
"""Wrap a Spark context""" | ||
|
||
def __init__(self, python_module): | ||
self.result = None | ||
if python_module: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to avoid such a hook, it's very specific. The We could add docs about this, still it is confusing to write There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can change it, what do you propose ? |
||
logger.debug(f"Loading spark context from python module {python_module}") | ||
module = importlib.import_module(python_module) | ||
create_spark_context = getattr(module, "create_spark_context") | ||
self.spark = create_spark_context() | ||
else: | ||
# Create a default pyspark context | ||
self.spark = SparkSession.builder.getOrCreate() | ||
|
||
def cursor(self): | ||
return self | ||
|
||
def rollback(self, *args, **kwargs): | ||
logger.debug("NotImplemented: rollback") | ||
|
||
def fetchall(self): | ||
try: | ||
rows = self.result.collect() | ||
logger.debug(rows) | ||
except Exception as e: | ||
logger.debug(f"raising error {e}") | ||
dbt.exceptions.raise_database_error(e) | ||
return rows | ||
|
||
def execute(self, sql, bindings=None): | ||
if sql.strip().endswith(";"): | ||
sql = sql.strip()[:-1] | ||
|
||
if bindings is not None: | ||
bindings = [self._fix_binding(binding) for binding in bindings] | ||
sql = sql % tuple(bindings) | ||
logger.debug(f"execute sql:{sql}") | ||
try: | ||
self.result = self.spark.sql(sql) | ||
logger.debug("Executed with no errors") | ||
if "show tables" in sql: | ||
self.result = self.result.withColumn("description", F.lit("")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why add the description column? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is an iceberg specific issue. When using iceberg it's missing the column. I'll remove this from the PR |
||
except Exception as e: | ||
logger.debug(f"raising error {e}") | ||
dbt.exceptions.raise_database_error(e) | ||
|
||
@classmethod | ||
def _fix_binding(cls, value): | ||
"""Convert complex datatypes to primitives that can be loaded by | ||
the Spark driver""" | ||
if isinstance(value, NUMBERS): | ||
return float(value) | ||
elif isinstance(value, datetime): | ||
return "'" + value.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "'" | ||
elif isinstance(value, str): | ||
return "'" + value + "'" | ||
else: | ||
logger.debug(type(value)) | ||
return "'" + str(value) + "'" | ||
|
||
@property | ||
def description(self): | ||
logger.debug(f"Description called returning list of columns: {self.result.columns}") | ||
ret = [] | ||
# Not sure the type is ever used by specifying it anyways | ||
string_type = sqlalchemy.types.String | ||
for column_name in self.result.columns: | ||
ret.append((column_name, string_type)) | ||
return ret | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
{% macro source(source_name, identifier, start_dt = None, end_dt = None) %} | ||
{%- set relation = builtins.source(source_name, identifier) -%} | ||
|
||
{%- if execute and (relation.source_meta.python_module or relation.meta.python_module) -%} | ||
{%- do relation.load_python_module(start_dt, end_dt) -%} | ||
{# Return the view name only. Spark view do not support schema and catalog names #} | ||
{%- do return(relation.identifier) -%} | ||
{% else -%} | ||
{%- do return(relation) -%} | ||
{% endif -%} | ||
{% endmacro %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Functions and Sparksessions are not used in this file