diff --git a/.github/workflows/ibis-backends.yml b/.github/workflows/ibis-backends.yml index 0faca64093d4..5637c4c7116e 100644 --- a/.github/workflows/ibis-backends.yml +++ b/.github/workflows/ibis-backends.yml @@ -44,6 +44,7 @@ jobs: matrix: python-version: - "3.9" + - "3.10" # For PyFlink, which does not support Python 3.11 yet - "3.11" steps: - name: checkout @@ -786,7 +787,7 @@ jobs: - windows-latest python-version: - "3.9" - - "3.11" + - "3.10" steps: - name: checkout uses: actions/checkout@v3 @@ -827,6 +828,10 @@ jobs: - name: install ibis run: poetry install --without dev --without docs --extras flink + # TODO(deepyaman): Remove step once Ibis and Flink are compatible. + - name: install pyflink + run: poetry run pip install apache-flink + - name: show installed deps run: poetry run pip list diff --git a/gen_matrix.py b/gen_matrix.py index 41aa75be6f1e..c9f65dab9e9d 100644 --- a/gen_matrix.py +++ b/gen_matrix.py @@ -1,6 +1,7 @@ from __future__ import annotations from pathlib import Path +from typing import TYPE_CHECKING import mkdocs_gen_files import pandas as pd @@ -8,10 +9,19 @@ import ibis import ibis.expr.operations as ops +if TYPE_CHECKING: + from collections.abc import Container, Sequence -def get_backends(): + from ibis.backends.base import BaseBackend + + +def get_backends(exclude: Container[str] = ()) -> Sequence[tuple[str, BaseBackend]]: entry_points = sorted(ep.name for ep in ibis.util.backend_entry_points()) - return [(backend, getattr(ibis, backend)) for backend in entry_points] + return [ + (backend, getattr(ibis, backend)) + for backend in entry_points + if backend not in exclude + ] def get_leaf_classes(op): @@ -29,14 +39,16 @@ def get_leaf_classes(op): ops.ScalarParameter, } -PUBLIC_OPS = (frozenset(get_leaf_classes(ops.Value))) - INTERNAL_OPS +PUBLIC_OPS = frozenset(get_leaf_classes(ops.Value)) - INTERNAL_OPS def main(): support = {"operation": [f"{op.__module__}.{op.__name__}" for op in PUBLIC_OPS]} support.update( (name, list(map(backend.has_operation, PUBLIC_OPS))) - for name, backend in get_backends() + # exclude flink until https://github.com/apache/flink/pull/23141 is + # merged and released we also need to roll it into poetry + for name, backend in get_backends(exclude=("flink",)) ) df = pd.DataFrame(support).set_index("operation").sort_index() diff --git a/ibis/backends/flink/__init__.py b/ibis/backends/flink/__init__.py index e69de29bb2d1..904038ae3dbe 100644 --- a/ibis/backends/flink/__init__.py +++ b/ibis/backends/flink/__init__.py @@ -0,0 +1,269 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import pyflink.version +import sqlglot as sg +from pyflink.table.types import create_arrow_schema + +import ibis.common.exceptions as exc +import ibis.expr.operations as ops +import ibis.expr.schema as sch +from ibis.backends.base import BaseBackend, CanListDatabases +from ibis.backends.base.sql.ddl import fully_qualified_re, is_fully_qualified +from ibis.backends.flink.compiler.core import FlinkCompiler + +if TYPE_CHECKING: + from collections.abc import Mapping + + import pandas as pd + import pyarrow as pa + from pyflink.table import TableEnvironment + + import ibis.expr.types as ir + + +class Backend(BaseBackend, CanListDatabases): + name = "flink" + compiler = FlinkCompiler + supports_temporary_tables = True + supports_python_udfs = True + + def do_connect(self, t_env: TableEnvironment) -> None: + """Create a Flink `Backend` for use with Ibis. + + Parameters + ---------- + t_env + A table environment + + Examples + -------- + >>> import ibis + >>> from pyflink.table import EnvironmentSettings, TableEnvironment + >>> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) + >>> ibis.flink.connect(t_env) + + """ + self._t_env = t_env + + def list_databases(self, like: str | None = None) -> list[str]: + databases = self._t_env.list_databases() + return self._filter_with_like(databases, like) + + @property + def current_database(self) -> str: + return self._t_env.get_current_database() + + def list_tables( + self, like: str | None = None, database: str | None = None + ) -> list[str]: + tables = self._t_env._j_tenv.listTables( + self._t_env.get_current_catalog(), database or self.current_database + ) + return self._filter_with_like(tables, like) + + def _fully_qualified_name(self, name: str, database: str | None) -> str: + if is_fully_qualified(name): + return name + + return sg.table(name, db=database or self.current_database).sql(dialect="hive") + + def table(self, name: str, database: str | None = None) -> ir.Table: + """Return a table expression from a table or view in the database. + + Parameters + ---------- + name + Table name + database + Database in which the table resides + + Returns + ------- + Table + Table named `name` from `database` + """ + if database is not None and not isinstance(database, str): + raise exc.IbisTypeError( + f"`database` must be a string; got {type(database)}" + ) + schema = self.get_schema(name, database=database) + qualified_name = self._fully_qualified_name(name, database) + _, quoted, unquoted = fully_qualified_re.search(qualified_name).groups() + unqualified_name = quoted or unquoted + node = ops.DatabaseTable(unqualified_name, schema, self, namespace=database) + return node.to_expr() + + def get_schema(self, table_name: str, database: str | None = None) -> sch.Schema: + """Return a Schema object for the indicated table and database. + + Parameters + ---------- + table_name + Table name + database + Database name + + Returns + ------- + sch.Schema + Ibis schema + """ + qualified_name = self._fully_qualified_name(table_name, database) + table = self._t_env.from_path(qualified_name) + schema = table.get_schema() + return sch.Schema.from_pyarrow( + create_arrow_schema(schema.get_field_names(), schema.get_field_data_types()) + ) + + @property + def version(self) -> str: + return pyflink.version.__version__ + + def compile( + self, + expr: ir.Expr, + params: Mapping[ir.Expr, Any] | None = None, + **kwargs: Any, + ) -> Any: + """Compile an expression.""" + return super().compile(expr, params=params) # Discard `limit` and other kwargs. + + def _to_sql(self, expr: ir.Expr, **kwargs: Any) -> str: + return str(self.compile(expr, **kwargs)) + + def execute(self, expr: ir.Expr, **kwargs: Any) -> Any: + """Execute an expression.""" + table_expr = expr.as_table() + sql = self.compile(table_expr, **kwargs) + df = self._t_env.sql_query(sql).to_pandas() + + # TODO: remove the extra conversion + return expr.__pandas_result__(table_expr.__pandas_result__(df)) + + def create_table( + self, + name: str, + obj: pd.DataFrame | pa.Table | ir.Table | None = None, + *, + schema: sch.Schema | None = None, + database: str | None = None, + temp: bool = False, + overwrite: bool = False, + ) -> ir.Table: + """Create a new table in Flink. + + Parameters + ---------- + name + Name of the new table. + obj + An Ibis table expression or pandas table that will be used to + extract the schema and the data of the new table. If not provided, + `schema` must be given. + schema + The schema for the new table. Only one of `schema` or `obj` can be + provided. + database + Name of the database where the table will be created, if not the + default. + temp + Whether a table is temporary or not + overwrite + Whether to clobber existing data + + Returns + ------- + Table + The table that was created. + """ + import pandas as pd + import pyarrow as pa + + if obj is None and schema is None: + raise exc.IbisError("The schema or obj parameter is required") + if isinstance(obj, pa.Table): + obj = obj.to_pandas() + if isinstance(obj, pd.DataFrame): + qualified_name = self._fully_qualified_name(name, database) + table = self._t_env.from_pandas(obj) + # FIXME(deepyaman): Create a catalog table, not a temp view. + self._t_env.create_temporary_view(qualified_name, table) + else: + raise NotImplementedError # TODO(deepyaman) + + return self.table(name, database=database) + + def drop_table( + self, + name: str, + *, + database: str | None = None, + force: bool = False, + ) -> None: + """Drop a table. + + Parameters + ---------- + name + Name of the table to drop. + database + Name of the database where the table exists, if not the default. + force + If `False`, an exception is raised if the table does not exist. + """ + qualified_name = self._fully_qualified_name(name, database) + if not (self._t_env.drop_temporary_table(qualified_name) or force): + raise exc.IntegrityError(f"Table {name} does not exist.") + + # TODO(deepyaman): Support (and differentiate) permanent tables. + + def create_view( + self, + name: str, + obj: ir.Table, + *, + database: str | None = None, + overwrite: bool = False, + ) -> ir.Table: + """Create a new view from an expression. + + Parameters + ---------- + name + Name of the new view. + obj + An Ibis table expression that will be used to create the view. + database + Name of the database where the view will be created, if not + provided the database's default is used. + overwrite + Whether to clobber an existing view with the same name + + Returns + ------- + Table + The view that was created. + """ + raise NotImplementedError + + def drop_view( + self, name: str, *, database: str | None = None, force: bool = False + ) -> None: + """Drop a view. + + Parameters + ---------- + name + Name of the view to drop. + database + Name of the database where the view exists, if not the default. + force + If `False`, an exception is raised if the view does not exist. + """ + qualified_name = self._fully_qualified_name(name, database) + if not (self._t_env.drop_temporary_view(qualified_name) or force): + raise exc.IntegrityError(f"View {name} does not exist.") + + # TODO(deepyaman): Support (and differentiate) permanent views. diff --git a/ibis/backends/impala/__init__.py b/ibis/backends/impala/__init__.py index 42470b7d12ac..c81153e2fc20 100644 --- a/ibis/backends/impala/__init__.py +++ b/ibis/backends/impala/__init__.py @@ -226,7 +226,7 @@ def do_connect( pool_size: int = 8, hdfs_client: fsspec.spec.AbstractFileSystem | None = None, ): - """Create an Impala Backend for use with Ibis. + """Create an Impala `Backend` for use with Ibis. Parameters ---------- diff --git a/ibis/backends/tests/test_dot_sql.py b/ibis/backends/tests/test_dot_sql.py index 6957dab41d81..8f7298250476 100644 --- a/ibis/backends/tests/test_dot_sql.py +++ b/ibis/backends/tests/test_dot_sql.py @@ -237,7 +237,7 @@ def test_dot_sql_reuse_alias_with_different_types(backend, alltypes, df): backend.assert_series_equal(foo2.x.execute(), expected2) -_NO_SQLGLOT_DIALECT = {"pandas", "dask", "datafusion", "druid"} +_NO_SQLGLOT_DIALECT = {"pandas", "dask", "datafusion", "druid", "flink"} no_sqlglot_dialect = sorted( param(backend, marks=pytest.mark.xfail) for backend in _NO_SQLGLOT_DIALECT ) diff --git a/justfile b/justfile index 69e14bd5d870..6b204daddf9e 100644 --- a/justfile +++ b/justfile @@ -59,7 +59,8 @@ doctest *args: -and -not -wholename '*test*.py' \ -and -not -wholename '*__init__*' \ -and -not -wholename '*gen_*.py' \ - -and -not -wholename '*ibis/expr/selectors.py' + -and -not -wholename '*ibis/expr/selectors.py' \ + -and -not -wholename '*ibis/backends/flink/*' # FIXME(deepyaman) ) # download testing data diff --git a/mkdocs.yml b/mkdocs.yml index 6d7420fb9eb5..8bdd9e6e30cb 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -58,6 +58,7 @@ plugins: - backends/badges.md - backends/*_support_matrix.csv - backends/app/* + - backends/flink/* # FIXME(deepyaman) - CONTRIBUTING.md - gen-files: scripts: diff --git a/pyproject.toml b/pyproject.toml index e4ffa6f908e9..503cafa3a505 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -225,6 +225,7 @@ dask = "ibis.backends.dask" datafusion = "ibis.backends.datafusion" druid = "ibis.backends.druid" duckdb = "ibis.backends.duckdb" +flink = "ibis.backends.flink" impala = "ibis.backends.impala" mysql = "ibis.backends.mysql" mssql = "ibis.backends.mssql" @@ -347,6 +348,7 @@ markers = [ "datafusion: Apache Datafusion tests", "druid: Apache Druid tests", "duckdb: DuckDB tests", + "flink: Flink tests", "impala: Apache Impala tests", "mysql: MySQL tests", "mssql: MS SQL Server tests",