Skip to content

Commit

Permalink
Merge pull request #132 from nrccua/DS-489-add-option-to-create-polar…
Browse files Browse the repository at this point in the history
…s-lazyframe-from-databri

Add option to create polars LazyFrame from databricks query via pyarrow
  • Loading branch information
nrccua-timr authored May 6, 2024
2 parents 8c68856 + fc5f698 commit 02ca264
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 22 deletions.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ History
=======


v0.20.19 (2024-05-06)

* Add option to create polars LazyFrame from databricks query via pyarrow.


v0.20.18 (2024-04-06)

* Add all variations of env names to ds_utils databricks catalog functions.
Expand Down
10 changes: 8 additions & 2 deletions aioradio/ds_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import pandas as pd
import polars as pl
import pyarrow as pa
import pyarrow.dataset as ds
from haversine import haversine, Unit
from mlflow.entities.model_registry.model_version_status import ModelVersionStatus
from mlflow.tracking.client import MlflowClient
Expand Down Expand Up @@ -81,10 +82,15 @@ def ese_db_catalog(env):
return catalog


def sql_to_polars_df(sql):
def sql_to_polars_df(sql, lazy=False, batch_size=None):
"""Get polars DataFrame from SQL query results."""

return pl.from_arrow(pa.Table.from_batches(spark.sql(sql)._collect_as_arrow()))
if lazy:
df = pl.scan_pyarrow_dataset(ds.dataset(spark.sql(sql)._collect_as_arrow()), batch_size=batch_size)
else:
df = pl.from_arrow(pa.Table.from_batches(spark.sql(sql)._collect_as_arrow()))

return df


def does_db_table_exists(name):
Expand Down
32 changes: 16 additions & 16 deletions aioradio/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
aioboto3==12.3.0
aioboto3==12.4.0
aiojobs==1.2.1
backoff==2.2.1
boto3==1.34.34
botocore==1.34.34
cython==3.0.8
boto3==1.34.69
botocore==1.34.69
cython==3.0.10
databricks-connect==14.3.1
ddtrace==2.6.5
dominodatalab==1.3.0
fakeredis==2.21.1
dominodatalab==1.4.0
fakeredis==2.22.0
faust-cchardet==2.1.19
flask==2.1.2
flask-cors==3.0.10
Expand All @@ -18,21 +18,21 @@ mlflow==2.10.2
moto==3.1.18
openpyxl==3.0.10
orjson==3.9.15
pandas==2.2.1
polars==0.20.13
pre-commit==3.6.2
pandas==2.2.2
polars==0.20.23
pre-commit==3.7.0
psycopg2-binary==2.9.9
pyarrow==15.0.0
pyarrow==15.0.2
pylint==3.1.0
pyodbc==5.1.0 --no-binary=pyodbc
pysmb==1.2.9.1
pyspark==3.4.1
pytest==8.0.2
pyspark==3.4.3
pytest==8.1.2
pytest-asyncio==0.21.1
pytest-cov==4.1.0
pytest-cov==5.0.0
python-json-logger==2.0.7
redis==5.0.2
redis==5.0.4
twine==5.0.0
typing_extensions==4.10.0
typing_extensions==4.11.0
werkzeug==2.1.2
wheel==0.42.0
wheel==0.43.0
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.20.18',
version='0.20.19',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand All @@ -21,11 +21,11 @@
],
install_requires=[
'cython>=0.29.33',
'aioboto3==12.3.0',
'aioboto3==12.4.0',
'aiojobs>=1.0.0',
'backoff>=2.1.2',
'botocore==1.34.34',
'boto3==1.34.34',
'botocore==1.34.69',
'boto3==1.34.69',
'ddtrace>=0.60.1',
'faust-cchardet>=2.1.18',
'fakeredis>=2.20.0',
Expand Down

0 comments on commit 02ca264

Please sign in to comment.