diff --git a/docs/content.zh/docs/dev/python/table/table_environment.md b/docs/content.zh/docs/dev/python/table/table_environment.md
index 6ba630d18f766..888c4dfc6bae3 100644
--- a/docs/content.zh/docs/dev/python/table/table_environment.md
+++ b/docs/content.zh/docs/dev/python/table/table_environment.md
@@ -435,46 +435,6 @@ TableEnvironment API
-废弃的 APIs
-
-
-
-
- APIs |
- 描述 |
- 文档 |
-
-
-
-
-
- register_function(name, function)
- |
-
- 注册一个 Python 用户自定义函数,并为其指定一个唯一的名称。
- 若已有与该名称相同的用户自定义函数,则替换之。
- 它可以通过 create_temporary_system_function 来替换。
- |
-
- {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_function" name="链接">}}
- |
-
-
-
- register_java_function(name, function_class_name)
- |
-
- 注册一个 Java 用户自定义函数,并为其指定一个唯一的名称。
- 若已有与该名称相同的用户自定义函数,则替换之。
- 它可以通过 create_java_temporary_system_function 来替换。
- |
-
- {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_java_function" name="链接">}}
- |
-
-
-
-
### 依赖管理
这些 APIs 用来管理 Python UDFs 所需要的 Python 依赖。
diff --git a/docs/content.zh/docs/dev/table/functions/udfs.md b/docs/content.zh/docs/dev/table/functions/udfs.md
index f137b2ba4466e..f369eccf541c5 100644
--- a/docs/content.zh/docs/dev/table/functions/udfs.md
+++ b/docs/content.zh/docs/dev/table/functions/udfs.md
@@ -1578,7 +1578,7 @@ public static class WeightedAvg extends AggregateFunction
-Deprecated APIs
-
-
-
-
- APIs |
- Description |
- Docs |
-
-
-
-
-
- register_function(name, function)
- |
-
- Registers a Python user-defined function under a unique name.
- Replaces already existing user-defined function under this name.
- It can be replaced by create_temporary_system_function.
- |
-
- {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_function" name="link">}}
- |
-
-
-
- register_java_function(name, function_class_name)
- |
-
- Registers a Java user defined function under a unique name.
- Replaces already existing user-defined functions under this name.
- It can be replaced by create_java_temporary_system_function.
- |
-
- {{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_java_function" name="link">}}
- |
-
-
-
-
### Dependency Management
These APIs are used to manage the Python dependencies which are required by the Python UDFs.
diff --git a/docs/content/docs/dev/table/data_stream_api.md b/docs/content/docs/dev/table/data_stream_api.md
index eabc11662c409..a7217053b0889 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -592,6 +592,7 @@ env = StreamExecutionEnvironment.get_execution_environment()
# set various configuration early
env.set_max_parallelism(256)
+env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name")
env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name")
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
diff --git a/flink-python/pyflink/common/execution_config.py b/flink-python/pyflink/common/execution_config.py
index ad4934a0e5e66..def9e1bb92d1a 100644
--- a/flink-python/pyflink/common/execution_config.py
+++ b/flink-python/pyflink/common/execution_config.py
@@ -15,14 +15,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import warnings
from typing import Dict, List
from pyflink.common.execution_mode import ExecutionMode
-from pyflink.common.input_dependency_constraint import InputDependencyConstraint
from pyflink.java_gateway import get_gateway
-from pyflink.util.java_utils import load_java_class
__all__ = ['ExecutionConfig']
@@ -284,58 +281,6 @@ def get_execution_mode(self) -> 'ExecutionMode':
j_execution_mode = self._j_execution_config.getExecutionMode()
return ExecutionMode._from_j_execution_mode(j_execution_mode)
- def set_default_input_dependency_constraint(
- self, input_dependency_constraint: InputDependencyConstraint) -> 'ExecutionConfig':
- """
- Sets the default input dependency constraint for vertex scheduling. It indicates when a
- task should be scheduled considering its inputs status.
-
- The default constraint is :data:`InputDependencyConstraint.ANY`.
-
- Example:
- ::
-
- >>> config.set_default_input_dependency_constraint(InputDependencyConstraint.ALL)
-
- :param input_dependency_constraint: The input dependency constraint. The constraints could
- be :data:`InputDependencyConstraint.ANY` or
- :data:`InputDependencyConstraint.ALL`.
-
- .. note:: Deprecated in 1.13. :class:`InputDependencyConstraint` is not used anymore in the
- current scheduler implementations.
- """
- warnings.warn("Deprecated in 1.13. InputDependencyConstraint is not used anywhere. "
- "Therefore, the method call set_default_input_dependency_constraint is "
- "obsolete.", DeprecationWarning)
-
- self._j_execution_config.setDefaultInputDependencyConstraint(
- input_dependency_constraint._to_j_input_dependency_constraint())
- return self
-
- def get_default_input_dependency_constraint(self) -> 'InputDependencyConstraint':
- """
- Gets the default input dependency constraint for vertex scheduling. It indicates when a
- task should be scheduled considering its inputs status.
-
- The default constraint is :data:`InputDependencyConstraint.ANY`.
-
- .. seealso:: :func:`set_default_input_dependency_constraint`
-
- :return: The input dependency constraint of this job. The possible constraints are
- :data:`InputDependencyConstraint.ANY` and :data:`InputDependencyConstraint.ALL`.
-
- .. note:: Deprecated in 1.13. :class:`InputDependencyConstraint` is not used anymore in the
- current scheduler implementations.
- """
- warnings.warn("Deprecated in 1.13. InputDependencyConstraint is not used anywhere. "
- "Therefore, the method call get_default_input_dependency_constraint is "
- "obsolete.", DeprecationWarning)
-
- j_input_dependency_constraint = self._j_execution_config\
- .getDefaultInputDependencyConstraint()
- return InputDependencyConstraint._from_j_input_dependency_constraint(
- j_input_dependency_constraint)
-
def enable_force_kryo(self) -> 'ExecutionConfig':
"""
Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
@@ -532,85 +477,6 @@ def set_global_job_parameters(self, global_job_parameters_dict: Dict) -> 'Execut
self._j_execution_config.setGlobalJobParameters(j_global_job_parameters)
return self
- def add_default_kryo_serializer(self,
- type_class_name: str,
- serializer_class_name: str) -> 'ExecutionConfig':
- """
- Adds a new Kryo default serializer to the Runtime.
-
- Example:
- ::
-
- >>> config.add_default_kryo_serializer("com.aaa.bbb.PojoClass",
- ... "com.aaa.bbb.Serializer")
-
- :param type_class_name: The full-qualified java class name of the types serialized with the
- given serializer.
- :param serializer_class_name: The full-qualified java class name of the serializer to use.
- """
- type_clz = load_java_class(type_class_name)
- j_serializer_clz = load_java_class(serializer_class_name)
- self._j_execution_config.addDefaultKryoSerializer(type_clz, j_serializer_clz)
- return self
-
- def register_type_with_kryo_serializer(self,
- type_class_name: str,
- serializer_class_name: str) -> 'ExecutionConfig':
- """
- Registers the given Serializer via its class as a serializer for the given type at the
- KryoSerializer.
-
- Example:
- ::
-
- >>> config.register_type_with_kryo_serializer("com.aaa.bbb.PojoClass",
- ... "com.aaa.bbb.Serializer")
-
- :param type_class_name: The full-qualified java class name of the types serialized with
- the given serializer.
- :param serializer_class_name: The full-qualified java class name of the serializer to use.
- """
- type_clz = load_java_class(type_class_name)
- j_serializer_clz = load_java_class(serializer_class_name)
- self._j_execution_config.registerTypeWithKryoSerializer(type_clz, j_serializer_clz)
- return self
-
- def register_pojo_type(self, type_class_name: str) -> 'ExecutionConfig':
- """
- Registers the given type with the serialization stack. If the type is eventually
- serialized as a POJO, then the type is registered with the POJO serializer. If the
- type ends up being serialized with Kryo, then it will be registered at Kryo to make
- sure that only tags are written.
-
- Example:
- ::
-
- >>> config.register_pojo_type("com.aaa.bbb.PojoClass")
-
- :param type_class_name: The full-qualified java class name of the type to register.
- """
- type_clz = load_java_class(type_class_name)
- self._j_execution_config.registerPojoType(type_clz)
- return self
-
- def register_kryo_type(self, type_class_name: str) -> 'ExecutionConfig':
- """
- Registers the given type with the serialization stack. If the type is eventually
- serialized as a POJO, then the type is registered with the POJO serializer. If the
- type ends up being serialized with Kryo, then it will be registered at Kryo to make
- sure that only tags are written.
-
- Example:
- ::
-
- >>> config.register_kryo_type("com.aaa.bbb.KryoClass")
-
- :param type_class_name: The full-qualified java class name of the type to register.
- """
- type_clz = load_java_class(type_class_name)
- self._j_execution_config.registerKryoType(type_clz)
- return self
-
def get_registered_types_with_kryo_serializer_classes(self) -> Dict[str, str]:
"""
Returns the registered types with their Kryo Serializer classes.
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py
index 7eab1e7efdc03..bb41b342fc95c 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -165,52 +165,6 @@ def test_get_set_global_job_parameters(self):
self.assertEqual(self.execution_config.get_global_job_parameters(), {"hello": "world"})
- def test_add_default_kryo_serializer(self):
-
- self.execution_config.add_default_kryo_serializer(
- "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
- "org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")
-
- class_dict = self.execution_config.get_default_kryo_serializer_classes()
-
- self.assertEqual(class_dict,
- {'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
- 'org.apache.flink.runtime.state'
- '.StateBackendTestBase$CustomKryoTestSerializer'})
-
- def test_register_type_with_kryo_serializer(self):
-
- self.execution_config.register_type_with_kryo_serializer(
- "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
- "org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")
-
- class_dict = self.execution_config.get_registered_types_with_kryo_serializer_classes()
-
- self.assertEqual(class_dict,
- {'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
- 'org.apache.flink.runtime.state'
- '.StateBackendTestBase$CustomKryoTestSerializer'})
-
- def test_register_pojo_type(self):
-
- self.execution_config.register_pojo_type(
- "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo")
-
- type_list = self.execution_config.get_registered_pojo_types()
-
- self.assertEqual(type_list,
- ["org.apache.flink.runtime.state.StateBackendTestBase$TestPojo"])
-
- def test_register_kryo_type(self):
-
- self.execution_config.register_kryo_type(
- "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo")
-
- type_list = self.execution_config.get_registered_kryo_types()
-
- self.assertEqual(type_list,
- ["org.apache.flink.runtime.state.StateBackendTestBase$TestPojo"])
-
def test_auto_type_registration(self):
self.assertFalse(self.execution_config.is_auto_type_registration_disabled())
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 246cbe16d92fb..31aa639e8e3b0 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -17,7 +17,6 @@
################################################################################
import os
import tempfile
-import warnings
from typing import List, Any, Optional, cast
@@ -40,7 +39,7 @@
from pyflink.datastream.utils import ResultTypeQueryable
from pyflink.java_gateway import get_gateway
from pyflink.serializers import PickleSerializer
-from pyflink.util.java_utils import load_java_class, add_jars_to_context_class_loader, \
+from pyflink.util.java_utils import add_jars_to_context_class_loader, \
invoke_method, get_field_value, is_local_deployment, get_j_env_configuration
@@ -376,85 +375,6 @@ def get_default_savepoint_directory(self) -> Optional[str]:
else:
return j_path.toString()
- def add_default_kryo_serializer(self, type_class_name: str, serializer_class_name: str):
- """
- Adds a new Kryo default serializer to the Runtime.
-
- Example:
- ::
-
- >>> env.add_default_kryo_serializer("com.aaa.bbb.TypeClass", "com.aaa.bbb.Serializer")
-
- :param type_class_name: The full-qualified java class name of the types serialized with the
- given serializer.
- :param serializer_class_name: The full-qualified java class name of the serializer to use.
-
- .. note:: Deprecated since version 1.19: Register data types and serializers through hard
- codes is deprecated, because you need to modify the codes when upgrading job
- version. You should configure this by option `pipeline.serialization-config`.
- """
- warnings.warn("Deprecated since version 1.19: Register data types and serializers through"
- " hard codes is deprecated, because you need to modify the codes when"
- " upgrading job version. You should configure this by config option "
- " 'pipeline.serialization-config'.", DeprecationWarning)
-
- type_clz = load_java_class(type_class_name)
- j_serializer_clz = load_java_class(serializer_class_name)
- self._j_stream_execution_environment.addDefaultKryoSerializer(type_clz, j_serializer_clz)
-
- def register_type_with_kryo_serializer(self, type_class_name: str, serializer_class_name: str):
- """
- Registers the given Serializer via its class as a serializer for the given type at the
- KryoSerializer.
-
- Example:
- ::
-
- >>> env.register_type_with_kryo_serializer("com.aaa.bbb.TypeClass",
- ... "com.aaa.bbb.Serializer")
-
- :param type_class_name: The full-qualified java class name of the types serialized with
- the given serializer.
- :param serializer_class_name: The full-qualified java class name of the serializer to use.
-
- .. note:: Deprecated since version 1.19: Register data types and serializers through hard
- codes is deprecated, because you need to modify the codes when upgrading job
- version. You should configure this by option `pipeline.serialization-config`.
- """
- warnings.warn("Deprecated since version 1.19: Register data types and serializers through"
- " hard codes is deprecated, because you need to modify the codes when"
- " upgrading job version. You should configure this by config option "
- " 'pipeline.serialization-config'.", DeprecationWarning)
- type_clz = load_java_class(type_class_name)
- j_serializer_clz = load_java_class(serializer_class_name)
- self._j_stream_execution_environment.registerTypeWithKryoSerializer(
- type_clz, j_serializer_clz)
-
- def register_type(self, type_class_name: str):
- """
- Registers the given type with the serialization stack. If the type is eventually
- serialized as a POJO, then the type is registered with the POJO serializer. If the
- type ends up being serialized with Kryo, then it will be registered at Kryo to make
- sure that only tags are written.
-
- Example:
- ::
-
- >>> env.register_type("com.aaa.bbb.TypeClass")
-
- :param type_class_name: The full-qualified java class name of the type to register.
-
- .. note:: Deprecated since version 1.19: Register data types and serializers through hard
- codes is deprecated, because you need to modify the codes when upgrading job
- version. You should configure this by option `pipeline.serialization-config`.
- """
- warnings.warn("Deprecated since version 1.19: Register data types and serializers through"
- " hard codes is deprecated, because you need to modify the codes when"
- " upgrading job version. You should configure this by config option "
- " 'pipeline.serialization-config'.", DeprecationWarning)
- type_clz = load_java_class(type_class_name)
- self._j_stream_execution_environment.registerType(type_clz)
-
def set_stream_time_characteristic(self, characteristic: TimeCharacteristic):
"""
Sets the time characteristic for all streams create from this environment, e.g., processing
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 08d4c02122ce0..a70d7cb983cbc 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -76,38 +76,6 @@ def test_get_set_default_local_parallelism(self):
self.assertEqual(parallelism, 8)
- def test_add_default_kryo_serializer(self):
- self.env.add_default_kryo_serializer(
- "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
- "org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")
-
- class_dict = self.env.get_config().get_default_kryo_serializer_classes()
-
- self.assertEqual(class_dict,
- {'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
- 'org.apache.flink.runtime.state'
- '.StateBackendTestBase$CustomKryoTestSerializer'})
-
- def test_register_type_with_kryo_serializer(self):
- self.env.register_type_with_kryo_serializer(
- "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
- "org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")
-
- class_dict = self.env.get_config().get_registered_types_with_kryo_serializer_classes()
-
- self.assertEqual(class_dict,
- {'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
- 'org.apache.flink.runtime.state'
- '.StateBackendTestBase$CustomKryoTestSerializer'})
-
- def test_register_type(self):
- self.env.register_type("org.apache.flink.runtime.state.StateBackendTestBase$TestPojo")
-
- type_list = self.env.get_config().get_registered_pojo_types()
-
- self.assertEqual(type_list,
- ['org.apache.flink.runtime.state.StateBackendTestBase$TestPojo'])
-
def test_get_set_max_parallelism(self):
self.env.set_max_parallelism(12)
diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py
index d44e3e8d95260..6f5596bd739a3 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -786,18 +786,6 @@ def get_options(self):
"""
return dict(self._j_catalog_base_table.getOptions())
- def get_schema(self) -> TableSchema:
- """
- Get the schema of the table.
-
- :return: Schema of the table/view.
-
- . note:: Deprecated in 1.14. This method returns the deprecated TableSchema class. The old
- class was a hybrid of resolved and unresolved schema information. It has been replaced by
- the new Schema which is always unresolved and will be resolved by the framework later.
- """
- return TableSchema(j_table_schema=self._j_catalog_base_table.getSchema())
-
def get_unresolved_schema(self) -> Schema:
"""
Returns the schema of the table or view.
diff --git a/flink-python/pyflink/table/environment_settings.py b/flink-python/pyflink/table/environment_settings.py
index 04af453230127..9ed2e4a973adb 100644
--- a/flink-python/pyflink/table/environment_settings.py
+++ b/flink-python/pyflink/table/environment_settings.py
@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import warnings
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import create_url_class_loader
@@ -166,18 +165,6 @@ def is_streaming_mode(self) -> bool:
"""
return self._j_environment_settings.isStreamingMode()
- def to_configuration(self) -> Configuration:
- """
- Convert to `pyflink.common.Configuration`.
-
- :return: Configuration with specified value.
-
- .. note:: Deprecated in 1.15. Please use
- :func:`EnvironmentSettings.get_configuration` instead.
- """
- warnings.warn("Deprecated in 1.15.", DeprecationWarning)
- return Configuration(j_configuration=self._j_environment_settings.toConfiguration())
-
def get_configuration(self) -> Configuration:
"""
Get the underlying `pyflink.common.Configuration`.
@@ -195,24 +182,6 @@ def new_instance() -> 'EnvironmentSettings.Builder':
"""
return EnvironmentSettings.Builder()
- @staticmethod
- def from_configuration(config: Configuration) -> 'EnvironmentSettings':
- """
- Creates the EnvironmentSetting with specified Configuration.
-
- :return: EnvironmentSettings.
-
- .. note:: Deprecated in 1.15. Please use
- :func:`EnvironmentSettings.Builder.with_configuration` instead.
- """
- warnings.warn("Deprecated in 1.15.", DeprecationWarning)
- gateway = get_gateway()
- context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader()
- new_classloader = create_url_class_loader([], context_classloader)
- gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader)
- return EnvironmentSettings(
- get_gateway().jvm.EnvironmentSettings.fromConfiguration(config._j_configuration))
-
@staticmethod
def in_streaming_mode() -> 'EnvironmentSettings':
"""
diff --git a/flink-python/pyflink/table/sinks.py b/flink-python/pyflink/table/sinks.py
deleted file mode 100644
index e6091ab026500..0000000000000
--- a/flink-python/pyflink/table/sinks.py
+++ /dev/null
@@ -1,78 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-from pyflink.java_gateway import get_gateway
-from pyflink.table.types import _to_java_data_type
-from pyflink.util import java_utils
-
-__all__ = ['TableSink', 'CsvTableSink', 'WriteMode']
-
-
-class TableSink(object):
- """
- A :class:`TableSink` specifies how to emit a table to an external system or location.
- """
-
- def __init__(self, j_table_sink):
- self._j_table_sink = j_table_sink
-
-
-class WriteMode(object):
- NO_OVERWRITE = 0
- OVERWRITE = 1
-
-
-class CsvTableSink(TableSink):
- """
- A simple :class:`TableSink` to emit data as CSV files.
-
- Example:
- ::
-
- >>> CsvTableSink(["a", "b"], [DataTypes.INT(), DataTypes.STRING()],
- ... "/csv/file/path", "|", 1, WriteMode.OVERWRITE)
-
- :param field_names: The list of field names.
- :param field_types: The list of field data types.
- :param path: The output path to write the Table to.
- :param field_delimiter: The field delimiter.
- :param num_files: The number of files to write to.
- :param write_mode: The write mode to specify whether existing files are overwritten or not,
- which contains: :data:`WriteMode.NO_OVERWRITE`
- and :data:`WriteMode.OVERWRITE`.
- """
-
- def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=-1,
- write_mode=None):
- gateway = get_gateway()
- if write_mode == WriteMode.NO_OVERWRITE:
- j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE
- elif write_mode == WriteMode.OVERWRITE:
- j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE
- elif write_mode is None:
- j_write_mode = None
- else:
- raise Exception('Unsupported write_mode: %s' % write_mode)
- j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names)
- j_field_types = java_utils.to_jarray(
- gateway.jvm.DataType,
- [_to_java_data_type(field_type) for field_type in field_types])
- j_csv_table_sink = gateway.jvm.CsvTableSink(
- path, field_delimiter, num_files, j_write_mode, j_field_names, j_field_types)
-
- super(CsvTableSink, self).__init__(j_csv_table_sink)
diff --git a/flink-python/pyflink/table/sources.py b/flink-python/pyflink/table/sources.py
deleted file mode 100644
index 3e0df45e8eae2..0000000000000
--- a/flink-python/pyflink/table/sources.py
+++ /dev/null
@@ -1,119 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-from pyflink.java_gateway import get_gateway
-from pyflink.table.types import _to_java_data_type
-__all__ = ['TableSource', 'CsvTableSource']
-
-
-class TableSource(object):
- """
- Defines a table from an external system or location.
- """
-
- def __init__(self, j_table_source):
- self._j_table_source = j_table_source
-
-
-class CsvTableSource(TableSource):
- """
- A :class:`TableSource` for simple CSV files with a
- (logically) unlimited number of fields.
-
- Example:
- ::
-
- >>> CsvTableSource("/csv/file/path", ["a", "b"], [DataTypes.INT(), DataTypes.STRING()])
-
- :param source_path: The path to the CSV file.
- :type source_path: str
- :param field_names: The names of the table fields.
- :type field_names: collections.Iterable[str]
- :param field_types: The types of the table fields.
- :type field_types: collections.Iterable[str]
- :param field_delim: The field delimiter, "," by default.
- :type field_delim: str, optional
- :param line_delim: The row delimiter, "\\n" by default.
- :type line_delim: str, optional
- :param quote_character: An optional quote character for String values, null by default.
- :type quote_character: str, optional
- :param ignore_first_line: Flag to ignore the first line, false by default.
- :type ignore_first_line: bool, optional
- :param ignore_comments: An optional prefix to indicate comments, null by default.
- :type ignore_comments: str, optional
- :param lenient: Flag to skip records with parse error instead to fail, false by default.
- :type lenient: bool, optional
- :param empty_column_as_null: Treat empty column as null, false by default.
- :type empty_column_as_null: bool, optional
- """
-
- def __init__(
- self,
- source_path,
- field_names,
- field_types,
- field_delim=None,
- line_delim=None,
- quote_character=None,
- ignore_first_line=None,
- ignore_comments=None,
- lenient=None,
- empty_column_as_null=None,
- ):
- gateway = get_gateway()
-
- builder = gateway.jvm.CsvTableSource.builder()
- builder.path(source_path)
-
- for (field_name, field_type) in zip(field_names, field_types):
- builder.field(field_name, _to_java_data_type(field_type))
-
- if field_delim is not None:
- builder.fieldDelimiter(field_delim)
-
- if line_delim is not None:
- builder.lineDelimiter(line_delim)
-
- if quote_character is not None:
- # Java API has a Character type for this field. At time of writing,
- # Py4J will convert the Python str to Java Character by taking only
- # the first character. This results in either:
- # - Silently truncating a Python str with more than one character
- # with no further type error from either Py4J or Java
- # CsvTableSource
- # - java.lang.StringIndexOutOfBoundsException from Py4J for an
- # empty Python str. That error can be made more friendly here.
- if len(quote_character) != 1:
- raise ValueError(
- "Expected a single CSV quote character but got '{}'".format(quote_character)
- )
- builder.quoteCharacter(quote_character)
-
- if ignore_first_line:
- builder.ignoreFirstLine()
-
- if ignore_comments is not None:
- builder.commentPrefix(ignore_comments)
-
- if lenient:
- builder.ignoreParseErrors()
-
- if empty_column_as_null:
- builder.emptyColumnAsNull()
-
- super(CsvTableSource, self).__init__(builder.build())
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index e1be145465b67..4940b76f75edf 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -19,14 +19,12 @@
import os
import sys
import tempfile
-import warnings
from typing import Union, List, Tuple, Iterable
from py4j.java_gateway import get_java_class, get_method
from pyflink.common.configuration import Configuration
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table.sources import TableSource
from pyflink.common.typeinfo import TypeInformation
from pyflink.datastream.data_stream import DataStream
@@ -47,7 +45,6 @@
from pyflink.table.udf import UserDefinedFunctionWrapper, AggregateFunction, udaf, \
udtaf, TableAggregateFunction
from pyflink.table.utils import to_expression_jarray
-from pyflink.util import java_utils
from pyflink.util.java_utils import get_j_env_configuration, is_local_deployment, load_java_class, \
to_j_explain_detail_arr, to_jarray, get_field
@@ -120,23 +117,6 @@ def create(environment_settings: Union[EnvironmentSettings, Configuration]) \
j_tenv = gateway.jvm.TableEnvironment.create(environment_settings._j_environment_settings)
return TableEnvironment(j_tenv)
- def from_table_source(self, table_source: 'TableSource') -> 'Table':
- """
- Creates a table from a table source.
-
- Example:
- ::
-
- >>> csv_table_source = CsvTableSource(
- ... csv_file_path, ['a', 'b'], [DataTypes.STRING(), DataTypes.BIGINT()])
- >>> table_env.from_table_source(csv_table_source)
-
- :param table_source: The table source used as table.
- :return: The result table.
- """
- warnings.warn("Deprecated in 1.11.", DeprecationWarning)
- return Table(self._j_tenv.fromTableSource(table_source._j_table_source), self)
-
def register_catalog(self, catalog_name: str, catalog: Catalog):
"""
Registers a :class:`~pyflink.table.catalog.Catalog` under a unique name.
@@ -480,58 +460,6 @@ def create_table(self, path: str, descriptor: TableDescriptor):
"""
self._j_tenv.createTable(path, descriptor._j_table_descriptor)
- def register_table(self, name: str, table: Table):
- """
- Registers a :class:`~pyflink.table.Table` under a unique name in the TableEnvironment's
- catalog. Registered tables can be referenced in SQL queries.
-
- Example:
- ::
-
- >>> tab = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])
- >>> table_env.register_table("source", tab)
-
- :param name: The name under which the table will be registered.
- :param table: The table to register.
-
- .. note:: Deprecated in 1.10. Use :func:`create_temporary_view` instead.
- """
- warnings.warn("Deprecated in 1.10. Use create_temporary_view instead.", DeprecationWarning)
- self._j_tenv.registerTable(name, table._j_table)
-
- def scan(self, *table_path: str) -> Table:
- """
- Scans a registered table and returns the resulting :class:`~pyflink.table.Table`.
- A table to scan must be registered in the TableEnvironment. It can be either directly
- registered or be an external member of a :class:`~pyflink.table.catalog.Catalog`.
-
- See the documentation of :func:`~pyflink.table.TableEnvironment.use_database` or
- :func:`~pyflink.table.TableEnvironment.use_catalog` for the rules on the path resolution.
-
- Examples:
-
- Scanning a directly registered table
- ::
-
- >>> tab = table_env.scan("tableName")
-
- Scanning a table from a registered catalog
- ::
-
- >>> tab = table_env.scan("catalogName", "dbName", "tableName")
-
- :param table_path: The path of the table to scan.
- :throws: Exception if no table is found using the given table path.
- :return: The resulting table.
-
- .. note:: Deprecated in 1.10. Use :func:`from_path` instead.
- """
- warnings.warn("Deprecated in 1.10. Use from_path instead.", DeprecationWarning)
- gateway = get_gateway()
- j_table_paths = java_utils.to_jarray(gateway.jvm.String, table_path)
- j_table = self._j_tenv.scan(j_table_paths)
- return Table(j_table, self)
-
def from_path(self, path: str) -> Table:
"""
Reads a registered table and returns the resulting :class:`~pyflink.table.Table`.
@@ -933,86 +861,6 @@ def get_config(self) -> TableConfig:
setattr(self, "table_config", table_config)
return getattr(self, "table_config")
- def register_java_function(self, name: str, function_class_name: str):
- """
- Registers a java user defined function under a unique name. Replaces already existing
- user-defined functions under this name. The acceptable function type contains
- **ScalarFunction**, **TableFunction** and **AggregateFunction**.
-
- Example:
- ::
-
- >>> table_env.register_java_function("func1", "java.user.defined.function.class.name")
-
- :param name: The name under which the function is registered.
- :param function_class_name: The java full qualified class name of the function to register.
- The function must have a public no-argument constructor and can
- be founded in current Java classloader.
-
- .. note:: Deprecated in 1.12. Use :func:`create_java_temporary_system_function` instead.
- """
- warnings.warn("Deprecated in 1.12. Use :func:`create_java_temporary_system_function` "
- "instead.", DeprecationWarning)
- gateway = get_gateway()
- java_function = gateway.jvm.Thread.currentThread().getContextClassLoader()\
- .loadClass(function_class_name).newInstance()
- # this is a temporary solution and will be unified later when we use the new type
- # system(DataType) to replace the old type system(TypeInformation).
- if not isinstance(self, StreamTableEnvironment) or self.__class__ == TableEnvironment:
- if self._is_table_function(java_function):
- self._register_table_function(name, java_function)
- elif self._is_aggregate_function(java_function):
- self._register_aggregate_function(name, java_function)
- else:
- self._j_tenv.registerFunction(name, java_function)
- else:
- self._j_tenv.registerFunction(name, java_function)
-
- def register_function(self, name: str, function: UserDefinedFunctionWrapper):
- """
- Registers a python user-defined function under a unique name. Replaces already existing
- user-defined function under this name.
-
- Example:
- ::
-
- >>> table_env.register_function(
- ... "add_one", udf(lambda i: i + 1, result_type=DataTypes.BIGINT()))
-
- >>> @udf(result_type=DataTypes.BIGINT())
- ... def add(i, j):
- ... return i + j
- >>> table_env.register_function("add", add)
-
- >>> class SubtractOne(ScalarFunction):
- ... def eval(self, i):
- ... return i - 1
- >>> table_env.register_function(
- ... "subtract_one", udf(SubtractOne(), result_type=DataTypes.BIGINT()))
-
- :param name: The name under which the function is registered.
- :param function: The python user-defined function to register.
-
- .. versionadded:: 1.10.0
-
- .. note:: Deprecated in 1.12. Use :func:`create_temporary_system_function` instead.
- """
- warnings.warn("Deprecated in 1.12. Use :func:`create_temporary_system_function` "
- "instead.", DeprecationWarning)
- function = self._wrap_aggregate_function_if_needed(function)
- java_function = function._java_user_defined_function()
- # this is a temporary solution and will be unified later when we use the new type
- # system(DataType) to replace the old type system(TypeInformation).
- if self.__class__ == TableEnvironment:
- if self._is_table_function(java_function):
- self._register_table_function(name, java_function)
- elif self._is_aggregate_function(java_function):
- self._register_aggregate_function(name, java_function)
- else:
- self._j_tenv.registerFunction(name, java_function)
- else:
- self._j_tenv.registerFunction(name, java_function)
-
def create_temporary_view(self,
view_path: str,
table_or_data_stream: Union[Table, DataStream],
diff --git a/flink-python/pyflink/table/tests/test_catalog.py b/flink-python/pyflink/table/tests/test_catalog.py
index dcc1bc7d9cea2..2c864a1a18b1e 100644
--- a/flink-python/pyflink/table/tests/test_catalog.py
+++ b/flink-python/pyflink/table/tests/test_catalog.py
@@ -56,12 +56,10 @@ def check_catalog_database_equals(self, cd1, cd2):
self.assertEqual(cd1.get_properties(), cd2.get_properties())
def check_catalog_table_equals(self, t1, t2):
- self.assertEqual(t1.get_schema(), t2.get_schema())
self.assertEqual(t1.get_options(), t2.get_options())
self.assertEqual(t1.get_comment(), t2.get_comment())
def check_catalog_view_equals(self, v1, v2):
- self.assertEqual(v1.get_schema(), v1.get_schema())
self.assertEqual(v1.get_options(), v2.get_options())
self.assertEqual(v1.get_comment(), v2.get_comment())
diff --git a/flink-python/pyflink/table/udf.py b/flink-python/pyflink/table/udf.py
index 9ea45ee7370a8..90e49653b60a1 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -627,8 +627,8 @@ def _create_udtaf(f, input_types, result_type, accumulator_type, func_type, dete
def udf(f: Union[Callable, ScalarFunction, Type] = None,
input_types: Union[List[DataType], DataType, str, List[str]] = None,
result_type: Union[DataType, str] = None,
- deterministic: bool = None, name: str = None, func_type: str = "general",
- udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:
+ deterministic: bool = None, name: str = None, func_type: str = "general"
+ ) -> Union[UserDefinedScalarFunctionWrapper, Callable]:
"""
Helper method for creating a user-defined function.
@@ -667,10 +667,6 @@ def udf(f: Union[Callable, ScalarFunction, Type] = None,
.. versionadded:: 1.10.0
"""
- if udf_type:
- import warnings
- warnings.warn("The param udf_type is deprecated in 1.12. Use func_type instead.")
- func_type = udf_type
if func_type not in ('general', 'pandas'):
raise ValueError("The func_type must be one of 'general, pandas', got %s."