Skip to content

Commit

Permalink
[FLINK-36307][python] Remove deprecated API in PyFlink module
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Sep 24, 2024
1 parent c635580 commit 4b356e5
Show file tree
Hide file tree
Showing 15 changed files with 5 additions and 774 deletions.
40 changes: 0 additions & 40 deletions docs/content.zh/docs/dev/python/table/table_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,46 +435,6 @@ TableEnvironment API
</tbody>
</table>

<big><strong>废弃的 APIs</strong></big>

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">APIs</th>
<th class="text-center">描述</th>
<th class="text-center" style="width: 10%">文档</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<strong>register_function(name, function)</strong>
</td>
<td>
注册一个 Python 用户自定义函数,并为其指定一个唯一的名称。
若已有与该名称相同的用户自定义函数,则替换之。
它可以通过 <strong>create_temporary_system_function</strong> 来替换。
</td>
<td class="text-center">
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_function" name="链接">}}
</td>
</tr>
<tr>
<td>
<strong>register_java_function(name, function_class_name)</strong>
</td>
<td>
注册一个 Java 用户自定义函数,并为其指定一个唯一的名称。
若已有与该名称相同的用户自定义函数,则替换之。
它可以通过 <strong>create_java_temporary_system_function</strong> 来替换。
</td>
<td class="text-center">
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_java_function" name="链接">}}
</td>
</tr>
</tbody>
</table>

### 依赖管理

这些 APIs 用来管理 Python UDFs 所需要的 Python 依赖。
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/dev/table/functions/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1578,7 +1578,7 @@ public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum

# 注册函数
t_env = ... # type: StreamTableEnvironment
t_env.register_java_function("wAvg", "my.java.function.WeightedAvg")
t_env.create_java_temporary_function("wAvg", "my.java.function.WeightedAvg")

# 使用函数
t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
Expand Down
40 changes: 0 additions & 40 deletions docs/content/docs/dev/python/table/table_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,46 +439,6 @@ For more details about the different kinds of UDFs, please refer to [User Define
</tbody>
</table>

<big><strong>Deprecated APIs</strong></big>

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">APIs</th>
<th class="text-center">Description</th>
<th class="text-center" style="width: 10%">Docs</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<strong>register_function(name, function)</strong>
</td>
<td>
Registers a Python user-defined function under a unique name.
Replaces already existing user-defined function under this name.
It can be replaced by <strong>create_temporary_system_function</strong>.
</td>
<td class="text-center">
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_function" name="link">}}
</td>
</tr>
<tr>
<td>
<strong>register_java_function(name, function_class_name)</strong>
</td>
<td>
Registers a Java user defined function under a unique name.
Replaces already existing user-defined functions under this name.
It can be replaced by <strong>create_java_temporary_system_function</strong>.
</td>
<td class="text-center">
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.register_java_function" name="link">}}
</td>
</tr>
</tbody>
</table>

### Dependency Management

These APIs are used to manage the Python dependencies which are required by the Python UDFs.
Expand Down
1 change: 1 addition & 0 deletions docs/content/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ env = StreamExecutionEnvironment.get_execution_environment()
# set various configuration early
env.set_max_parallelism(256)

env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name")
env.get_config().add_default_kryo_serializer("type_class_name", "serializer_class_name")

env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
Expand Down
134 changes: 0 additions & 134 deletions flink-python/pyflink/common/execution_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import warnings

from typing import Dict, List

from pyflink.common.execution_mode import ExecutionMode
from pyflink.common.input_dependency_constraint import InputDependencyConstraint
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import load_java_class

__all__ = ['ExecutionConfig']

Expand Down Expand Up @@ -284,58 +281,6 @@ def get_execution_mode(self) -> 'ExecutionMode':
j_execution_mode = self._j_execution_config.getExecutionMode()
return ExecutionMode._from_j_execution_mode(j_execution_mode)

def set_default_input_dependency_constraint(
self, input_dependency_constraint: InputDependencyConstraint) -> 'ExecutionConfig':
"""
Sets the default input dependency constraint for vertex scheduling. It indicates when a
task should be scheduled considering its inputs status.
The default constraint is :data:`InputDependencyConstraint.ANY`.
Example:
::
>>> config.set_default_input_dependency_constraint(InputDependencyConstraint.ALL)
:param input_dependency_constraint: The input dependency constraint. The constraints could
be :data:`InputDependencyConstraint.ANY` or
:data:`InputDependencyConstraint.ALL`.
.. note:: Deprecated in 1.13. :class:`InputDependencyConstraint` is not used anymore in the
current scheduler implementations.
"""
warnings.warn("Deprecated in 1.13. InputDependencyConstraint is not used anywhere. "
"Therefore, the method call set_default_input_dependency_constraint is "
"obsolete.", DeprecationWarning)

self._j_execution_config.setDefaultInputDependencyConstraint(
input_dependency_constraint._to_j_input_dependency_constraint())
return self

def get_default_input_dependency_constraint(self) -> 'InputDependencyConstraint':
"""
Gets the default input dependency constraint for vertex scheduling. It indicates when a
task should be scheduled considering its inputs status.
The default constraint is :data:`InputDependencyConstraint.ANY`.
.. seealso:: :func:`set_default_input_dependency_constraint`
:return: The input dependency constraint of this job. The possible constraints are
:data:`InputDependencyConstraint.ANY` and :data:`InputDependencyConstraint.ALL`.
.. note:: Deprecated in 1.13. :class:`InputDependencyConstraint` is not used anymore in the
current scheduler implementations.
"""
warnings.warn("Deprecated in 1.13. InputDependencyConstraint is not used anywhere. "
"Therefore, the method call get_default_input_dependency_constraint is "
"obsolete.", DeprecationWarning)

j_input_dependency_constraint = self._j_execution_config\
.getDefaultInputDependencyConstraint()
return InputDependencyConstraint._from_j_input_dependency_constraint(
j_input_dependency_constraint)

def enable_force_kryo(self) -> 'ExecutionConfig':
"""
Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
Expand Down Expand Up @@ -532,85 +477,6 @@ def set_global_job_parameters(self, global_job_parameters_dict: Dict) -> 'Execut
self._j_execution_config.setGlobalJobParameters(j_global_job_parameters)
return self

def add_default_kryo_serializer(self,
type_class_name: str,
serializer_class_name: str) -> 'ExecutionConfig':
"""
Adds a new Kryo default serializer to the Runtime.
Example:
::
>>> config.add_default_kryo_serializer("com.aaa.bbb.PojoClass",
... "com.aaa.bbb.Serializer")
:param type_class_name: The full-qualified java class name of the types serialized with the
given serializer.
:param serializer_class_name: The full-qualified java class name of the serializer to use.
"""
type_clz = load_java_class(type_class_name)
j_serializer_clz = load_java_class(serializer_class_name)
self._j_execution_config.addDefaultKryoSerializer(type_clz, j_serializer_clz)
return self

def register_type_with_kryo_serializer(self,
type_class_name: str,
serializer_class_name: str) -> 'ExecutionConfig':
"""
Registers the given Serializer via its class as a serializer for the given type at the
KryoSerializer.
Example:
::
>>> config.register_type_with_kryo_serializer("com.aaa.bbb.PojoClass",
... "com.aaa.bbb.Serializer")
:param type_class_name: The full-qualified java class name of the types serialized with
the given serializer.
:param serializer_class_name: The full-qualified java class name of the serializer to use.
"""
type_clz = load_java_class(type_class_name)
j_serializer_clz = load_java_class(serializer_class_name)
self._j_execution_config.registerTypeWithKryoSerializer(type_clz, j_serializer_clz)
return self

def register_pojo_type(self, type_class_name: str) -> 'ExecutionConfig':
"""
Registers the given type with the serialization stack. If the type is eventually
serialized as a POJO, then the type is registered with the POJO serializer. If the
type ends up being serialized with Kryo, then it will be registered at Kryo to make
sure that only tags are written.
Example:
::
>>> config.register_pojo_type("com.aaa.bbb.PojoClass")
:param type_class_name: The full-qualified java class name of the type to register.
"""
type_clz = load_java_class(type_class_name)
self._j_execution_config.registerPojoType(type_clz)
return self

def register_kryo_type(self, type_class_name: str) -> 'ExecutionConfig':
"""
Registers the given type with the serialization stack. If the type is eventually
serialized as a POJO, then the type is registered with the POJO serializer. If the
type ends up being serialized with Kryo, then it will be registered at Kryo to make
sure that only tags are written.
Example:
::
>>> config.register_kryo_type("com.aaa.bbb.KryoClass")
:param type_class_name: The full-qualified java class name of the type to register.
"""
type_clz = load_java_class(type_class_name)
self._j_execution_config.registerKryoType(type_clz)
return self

def get_registered_types_with_kryo_serializer_classes(self) -> Dict[str, str]:
"""
Returns the registered types with their Kryo Serializer classes.
Expand Down
46 changes: 0 additions & 46 deletions flink-python/pyflink/common/tests/test_execution_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,52 +165,6 @@ def test_get_set_global_job_parameters(self):

self.assertEqual(self.execution_config.get_global_job_parameters(), {"hello": "world"})

def test_add_default_kryo_serializer(self):

self.execution_config.add_default_kryo_serializer(
"org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
"org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")

class_dict = self.execution_config.get_default_kryo_serializer_classes()

self.assertEqual(class_dict,
{'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
'org.apache.flink.runtime.state'
'.StateBackendTestBase$CustomKryoTestSerializer'})

def test_register_type_with_kryo_serializer(self):

self.execution_config.register_type_with_kryo_serializer(
"org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
"org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")

class_dict = self.execution_config.get_registered_types_with_kryo_serializer_classes()

self.assertEqual(class_dict,
{'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
'org.apache.flink.runtime.state'
'.StateBackendTestBase$CustomKryoTestSerializer'})

def test_register_pojo_type(self):

self.execution_config.register_pojo_type(
"org.apache.flink.runtime.state.StateBackendTestBase$TestPojo")

type_list = self.execution_config.get_registered_pojo_types()

self.assertEqual(type_list,
["org.apache.flink.runtime.state.StateBackendTestBase$TestPojo"])

def test_register_kryo_type(self):

self.execution_config.register_kryo_type(
"org.apache.flink.runtime.state.StateBackendTestBase$TestPojo")

type_list = self.execution_config.get_registered_kryo_types()

self.assertEqual(type_list,
["org.apache.flink.runtime.state.StateBackendTestBase$TestPojo"])

def test_auto_type_registration(self):

self.assertFalse(self.execution_config.is_auto_type_registration_disabled())
Expand Down
Loading

0 comments on commit 4b356e5

Please sign in to comment.