Skip to content

Commit

Permalink
feat(flink): implement a minimal PyFlink Backend
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman authored and cpcloud committed Aug 21, 2023
1 parent d27374b commit 46d0e33
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 8 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
matrix:
python-version:
- "3.9"
- "3.10" # For PyFlink, which does not support Python 3.11 yet
- "3.11"
steps:
- name: checkout
Expand Down Expand Up @@ -786,7 +787,7 @@ jobs:
- windows-latest
python-version:
- "3.9"
- "3.11"
- "3.10"
steps:
- name: checkout
uses: actions/checkout@v3
Expand Down Expand Up @@ -827,6 +828,10 @@ jobs:
- name: install ibis
run: poetry install --without dev --without docs --extras flink

# TODO(deepyaman): Remove step once Ibis and Flink are compatible.
- name: install pyflink
run: poetry run pip install apache-flink

- name: show installed deps
run: poetry run pip list

Expand Down
20 changes: 16 additions & 4 deletions gen_matrix.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

import mkdocs_gen_files
import pandas as pd

import ibis
import ibis.expr.operations as ops

if TYPE_CHECKING:
from collections.abc import Container, Sequence

def get_backends():
from ibis.backends.base import BaseBackend


def get_backends(exclude: Container[str] = ()) -> Sequence[tuple[str, BaseBackend]]:
entry_points = sorted(ep.name for ep in ibis.util.backend_entry_points())
return [(backend, getattr(ibis, backend)) for backend in entry_points]
return [
(backend, getattr(ibis, backend))
for backend in entry_points
if backend not in exclude
]


def get_leaf_classes(op):
Expand All @@ -29,14 +39,16 @@ def get_leaf_classes(op):
ops.ScalarParameter,
}

PUBLIC_OPS = (frozenset(get_leaf_classes(ops.Value))) - INTERNAL_OPS
PUBLIC_OPS = frozenset(get_leaf_classes(ops.Value)) - INTERNAL_OPS


def main():
support = {"operation": [f"{op.__module__}.{op.__name__}" for op in PUBLIC_OPS]}
support.update(
(name, list(map(backend.has_operation, PUBLIC_OPS)))
for name, backend in get_backends()
# exclude flink until https://github.com/apache/flink/pull/23141 is
# merged and released we also need to roll it into poetry
for name, backend in get_backends(exclude=("flink",))
)

df = pd.DataFrame(support).set_index("operation").sort_index()
Expand Down
269 changes: 269 additions & 0 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import pyflink.version
import sqlglot as sg
from pyflink.table.types import create_arrow_schema

import ibis.common.exceptions as exc
import ibis.expr.operations as ops
import ibis.expr.schema as sch
from ibis.backends.base import BaseBackend, CanListDatabases
from ibis.backends.base.sql.ddl import fully_qualified_re, is_fully_qualified
from ibis.backends.flink.compiler.core import FlinkCompiler

if TYPE_CHECKING:
from collections.abc import Mapping

import pandas as pd
import pyarrow as pa
from pyflink.table import TableEnvironment

import ibis.expr.types as ir


class Backend(BaseBackend, CanListDatabases):
name = "flink"
compiler = FlinkCompiler
supports_temporary_tables = True
supports_python_udfs = True

def do_connect(self, t_env: TableEnvironment) -> None:
"""Create a Flink `Backend` for use with Ibis.
Parameters
----------
t_env
A table environment
Examples
--------
>>> import ibis
>>> from pyflink.table import EnvironmentSettings, TableEnvironment
>>> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>> ibis.flink.connect(t_env)
<ibis.backends.flink.Backend at 0x...>
"""
self._t_env = t_env

def list_databases(self, like: str | None = None) -> list[str]:
databases = self._t_env.list_databases()
return self._filter_with_like(databases, like)

@property
def current_database(self) -> str:
return self._t_env.get_current_database()

def list_tables(
self, like: str | None = None, database: str | None = None
) -> list[str]:
tables = self._t_env._j_tenv.listTables(
self._t_env.get_current_catalog(), database or self.current_database
)
return self._filter_with_like(tables, like)

def _fully_qualified_name(self, name: str, database: str | None) -> str:
if is_fully_qualified(name):
return name

return sg.table(name, db=database or self.current_database).sql(dialect="hive")

def table(self, name: str, database: str | None = None) -> ir.Table:
"""Return a table expression from a table or view in the database.
Parameters
----------
name
Table name
database
Database in which the table resides
Returns
-------
Table
Table named `name` from `database`
"""
if database is not None and not isinstance(database, str):
raise exc.IbisTypeError(
f"`database` must be a string; got {type(database)}"
)
schema = self.get_schema(name, database=database)
qualified_name = self._fully_qualified_name(name, database)
_, quoted, unquoted = fully_qualified_re.search(qualified_name).groups()
unqualified_name = quoted or unquoted
node = ops.DatabaseTable(unqualified_name, schema, self, namespace=database)
return node.to_expr()

def get_schema(self, table_name: str, database: str | None = None) -> sch.Schema:
"""Return a Schema object for the indicated table and database.
Parameters
----------
table_name
Table name
database
Database name
Returns
-------
sch.Schema
Ibis schema
"""
qualified_name = self._fully_qualified_name(table_name, database)
table = self._t_env.from_path(qualified_name)
schema = table.get_schema()
return sch.Schema.from_pyarrow(
create_arrow_schema(schema.get_field_names(), schema.get_field_data_types())
)

@property
def version(self) -> str:
return pyflink.version.__version__

def compile(
self,
expr: ir.Expr,
params: Mapping[ir.Expr, Any] | None = None,
**kwargs: Any,
) -> Any:
"""Compile an expression."""
return super().compile(expr, params=params) # Discard `limit` and other kwargs.

def _to_sql(self, expr: ir.Expr, **kwargs: Any) -> str:
return str(self.compile(expr, **kwargs))

def execute(self, expr: ir.Expr, **kwargs: Any) -> Any:
"""Execute an expression."""
table_expr = expr.as_table()
sql = self.compile(table_expr, **kwargs)
df = self._t_env.sql_query(sql).to_pandas()

# TODO: remove the extra conversion
return expr.__pandas_result__(table_expr.__pandas_result__(df))

def create_table(
self,
name: str,
obj: pd.DataFrame | pa.Table | ir.Table | None = None,
*,
schema: sch.Schema | None = None,
database: str | None = None,
temp: bool = False,
overwrite: bool = False,
) -> ir.Table:
"""Create a new table in Flink.
Parameters
----------
name
Name of the new table.
obj
An Ibis table expression or pandas table that will be used to
extract the schema and the data of the new table. If not provided,
`schema` must be given.
schema
The schema for the new table. Only one of `schema` or `obj` can be
provided.
database
Name of the database where the table will be created, if not the
default.
temp
Whether a table is temporary or not
overwrite
Whether to clobber existing data
Returns
-------
Table
The table that was created.
"""
import pandas as pd
import pyarrow as pa

if obj is None and schema is None:
raise exc.IbisError("The schema or obj parameter is required")
if isinstance(obj, pa.Table):
obj = obj.to_pandas()
if isinstance(obj, pd.DataFrame):
qualified_name = self._fully_qualified_name(name, database)
table = self._t_env.from_pandas(obj)
# FIXME(deepyaman): Create a catalog table, not a temp view.
self._t_env.create_temporary_view(qualified_name, table)
else:
raise NotImplementedError # TODO(deepyaman)

return self.table(name, database=database)

def drop_table(
self,
name: str,
*,
database: str | None = None,
force: bool = False,
) -> None:
"""Drop a table.
Parameters
----------
name
Name of the table to drop.
database
Name of the database where the table exists, if not the default.
force
If `False`, an exception is raised if the table does not exist.
"""
qualified_name = self._fully_qualified_name(name, database)
if not (self._t_env.drop_temporary_table(qualified_name) or force):
raise exc.IntegrityError(f"Table {name} does not exist.")

# TODO(deepyaman): Support (and differentiate) permanent tables.

def create_view(
self,
name: str,
obj: ir.Table,
*,
database: str | None = None,
overwrite: bool = False,
) -> ir.Table:
"""Create a new view from an expression.
Parameters
----------
name
Name of the new view.
obj
An Ibis table expression that will be used to create the view.
database
Name of the database where the view will be created, if not
provided the database's default is used.
overwrite
Whether to clobber an existing view with the same name
Returns
-------
Table
The view that was created.
"""
raise NotImplementedError

def drop_view(
self, name: str, *, database: str | None = None, force: bool = False
) -> None:
"""Drop a view.
Parameters
----------
name
Name of the view to drop.
database
Name of the database where the view exists, if not the default.
force
If `False`, an exception is raised if the view does not exist.
"""
qualified_name = self._fully_qualified_name(name, database)
if not (self._t_env.drop_temporary_view(qualified_name) or force):
raise exc.IntegrityError(f"View {name} does not exist.")

# TODO(deepyaman): Support (and differentiate) permanent views.
2 changes: 1 addition & 1 deletion ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def do_connect(
pool_size: int = 8,
hdfs_client: fsspec.spec.AbstractFileSystem | None = None,
):
"""Create an Impala Backend for use with Ibis.
"""Create an Impala `Backend` for use with Ibis.
Parameters
----------
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/tests/test_dot_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def test_dot_sql_reuse_alias_with_different_types(backend, alltypes, df):
backend.assert_series_equal(foo2.x.execute(), expected2)


_NO_SQLGLOT_DIALECT = {"pandas", "dask", "datafusion", "druid"}
_NO_SQLGLOT_DIALECT = {"pandas", "dask", "datafusion", "druid", "flink"}
no_sqlglot_dialect = sorted(
param(backend, marks=pytest.mark.xfail) for backend in _NO_SQLGLOT_DIALECT
)
Expand Down
3 changes: 2 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ doctest *args:
-and -not -wholename '*test*.py' \
-and -not -wholename '*__init__*' \
-and -not -wholename '*gen_*.py' \
-and -not -wholename '*ibis/expr/selectors.py'
-and -not -wholename '*ibis/expr/selectors.py' \
-and -not -wholename '*ibis/backends/flink/*' # FIXME(deepyaman)
)

# download testing data
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ plugins:
- backends/badges.md
- backends/*_support_matrix.csv
- backends/app/*
- backends/flink/* # FIXME(deepyaman)
- CONTRIBUTING.md
- gen-files:
scripts:
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ dask = "ibis.backends.dask"
datafusion = "ibis.backends.datafusion"
druid = "ibis.backends.druid"
duckdb = "ibis.backends.duckdb"
flink = "ibis.backends.flink"
impala = "ibis.backends.impala"
mysql = "ibis.backends.mysql"
mssql = "ibis.backends.mssql"
Expand Down Expand Up @@ -347,6 +348,7 @@ markers = [
"datafusion: Apache Datafusion tests",
"druid: Apache Druid tests",
"duckdb: DuckDB tests",
"flink: Flink tests",
"impala: Apache Impala tests",
"mysql: MySQL tests",
"mssql: MS SQL Server tests",
Expand Down

0 comments on commit 46d0e33

Please sign in to comment.