From 2438d77257be09e0d20e25d19348c3afa890854b Mon Sep 17 00:00:00 2001 From: GoodBoyCoder Date: Thu, 9 Jan 2025 16:10:16 +0800 Subject: [PATCH] feature: add fury serializer support (#7038) --- all/pom.xml | 5 ++ changes/en-us/2.x.md | 3 +- changes/zh-cn/2.x.md | 3 +- .../serializer/SerializerServiceLoader.java | 3 +- .../seata/core/serializer/SerializerType.java | 10 ++- serializer/pom.xml | 1 + serializer/seata-serializer-all/pom.xml | 5 ++ serializer/seata-serializer-fury/pom.xml | 46 +++++++++++ .../seata/serializer/fury/FurySerializer.java | 43 ++++++++++ .../fury/FurySerializerFactory.java | 53 ++++++++++++ ...rg.apache.seata.core.serializer.Serializer | 17 ++++ .../serializer/fury/FurySerializerTest.java | 81 +++++++++++++++++++ 12 files changed, 266 insertions(+), 4 deletions(-) create mode 100644 serializer/seata-serializer-fury/pom.xml create mode 100644 serializer/seata-serializer-fury/src/main/java/org/apache/seata/serializer/fury/FurySerializer.java create mode 100644 serializer/seata-serializer-fury/src/main/java/org/apache/seata/serializer/fury/FurySerializerFactory.java create mode 100644 serializer/seata-serializer-fury/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer create mode 100644 serializer/seata-serializer-fury/src/test/java/org/apache/seata/serializer/fury/FurySerializerTest.java diff --git a/all/pom.xml b/all/pom.xml index 8148247ed4b..867ec15c552 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -262,6 +262,11 @@ seata-hsf ${project.version} + + org.apache.seata + seata-serializer-fury + ${project.version} + org.apache.seata seata-serializer-kryo diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index b1afbb58d16..a2ea0b0b4a7 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -6,6 +6,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury undolog parser - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation +- [[#7038](https://github.com/apache/incubator-seata/pull/7038)] support fury serializer ### bugfix: @@ -36,9 +37,9 @@ Thanks to these contributors for their code commits. Please report an unintended - [slievrly](https://github.com/slievrly) -- [GoodBoyCoder](https://github.com/GoodBoyCoder) - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) +- [GoodBoyCoder](https://github.com/GoodBoyCoder) - [PeppaO](https://github.com/PeppaO) - [funky-eyes](https://github.com/funky-eyes) diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 0cf918bf87a..0c4892049b1 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -6,6 +6,7 @@ - [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 支持UndoLog的fury序列化方式 - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换 +- [[#7038](https://github.com/apache/incubator-seata/pull/7038)] 支持Fury序列化器 ### bugfix: @@ -36,9 +37,9 @@ - [slievrly](https://github.com/slievrly) -- [GoodBoyCoder](https://github.com/GoodBoyCoder) - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) +- [GoodBoyCoder](https://github.com/GoodBoyCoder) - [PeppaO](https://github.com/PeppaO) - [funky-eyes](https://github.com/funky-eyes) diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java index 359867b5104..ed3e7e3b8e4 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import static org.apache.seata.core.serializer.SerializerType.FASTJSON2; +import static org.apache.seata.core.serializer.SerializerType.FURY; import static org.apache.seata.core.serializer.SerializerType.HESSIAN; import static org.apache.seata.core.serializer.SerializerType.KRYO; import static org.apache.seata.core.serializer.SerializerType.PROTOBUF; @@ -47,7 +48,7 @@ public final class SerializerServiceLoader { private static final Logger LOGGER = LoggerFactory.getLogger(SerializerServiceLoader.class); private static final Configuration CONFIG = ConfigurationFactory.getInstance(); - private static final SerializerType[] DEFAULT_SERIALIZER_TYPE = new SerializerType[]{SEATA, PROTOBUF, KRYO, HESSIAN, FASTJSON2}; + private static final SerializerType[] DEFAULT_SERIALIZER_TYPE = new SerializerType[]{SEATA, PROTOBUF, KRYO, HESSIAN, FASTJSON2, FURY}; private final static Map SERIALIZER_MAP = new HashMap<>(); diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java index 39772b34ce0..7112f8a32a8 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java @@ -77,7 +77,15 @@ public enum SerializerType { *

* Math.pow(2, 7) */ - GRPC((byte) 0x128); + GRPC((byte) 0x128), + + /** + * The fury. + *

+ * Math.pow(2, 8) + */ + FURY((byte) 0x256) + ; private final byte code; diff --git a/serializer/pom.xml b/serializer/pom.xml index 246ba6c2757..e4435a724d1 100644 --- a/serializer/pom.xml +++ b/serializer/pom.xml @@ -38,6 +38,7 @@ seata-serializer-kryo seata-serializer-hessian seata-serializer-fastjson2 + seata-serializer-fury diff --git a/serializer/seata-serializer-all/pom.xml b/serializer/seata-serializer-all/pom.xml index a0d816153bf..d34e9f6b14e 100644 --- a/serializer/seata-serializer-all/pom.xml +++ b/serializer/seata-serializer-all/pom.xml @@ -55,5 +55,10 @@ seata-serializer-fastjson2 ${project.version} + + ${project.groupId} + seata-serializer-fury + ${project.version} + diff --git a/serializer/seata-serializer-fury/pom.xml b/serializer/seata-serializer-fury/pom.xml new file mode 100644 index 00000000000..18a40ac2026 --- /dev/null +++ b/serializer/seata-serializer-fury/pom.xml @@ -0,0 +1,46 @@ + + + + + org.apache.seata + seata-serializer + ${revision} + + 4.0.0 + seata-serializer-fury + jar + seata-serializer-fury ${project.version} + serializer-fury for Seata built with Maven + + + + ${project.groupId} + seata-core + ${project.version} + + + + org.apache.fury + fury-core + + + \ No newline at end of file diff --git a/serializer/seata-serializer-fury/src/main/java/org/apache/seata/serializer/fury/FurySerializer.java b/serializer/seata-serializer-fury/src/main/java/org/apache/seata/serializer/fury/FurySerializer.java new file mode 100644 index 00000000000..1f39a6dbba5 --- /dev/null +++ b/serializer/seata-serializer-fury/src/main/java/org/apache/seata/serializer/fury/FurySerializer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.fury; + +import org.apache.fury.ThreadSafeFury; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.protocol.AbstractMessage; +import org.apache.seata.core.serializer.Serializer; + +@LoadLevel(name = "FURY") +public class FurySerializer implements Serializer { + @Override + public byte[] serialize(T t) { + if (!(t instanceof AbstractMessage)) { + throw new IllegalArgumentException("AbstractMessage isn't available."); + } + ThreadSafeFury threadSafeFury = FurySerializerFactory.getInstance().get(); + return threadSafeFury.serialize(t); + } + + @Override + public T deserialize(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + throw new IllegalArgumentException("bytes is null"); + } + ThreadSafeFury threadSafeFury = FurySerializerFactory.getInstance().get(); + return (T) threadSafeFury.deserialize(bytes); + } +} diff --git a/serializer/seata-serializer-fury/src/main/java/org/apache/seata/serializer/fury/FurySerializerFactory.java b/serializer/seata-serializer-fury/src/main/java/org/apache/seata/serializer/fury/FurySerializerFactory.java new file mode 100644 index 00000000000..806e29878bb --- /dev/null +++ b/serializer/seata-serializer-fury/src/main/java/org/apache/seata/serializer/fury/FurySerializerFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.fury; + +import org.apache.fury.Fury; +import org.apache.fury.ThreadLocalFury; +import org.apache.fury.ThreadSafeFury; +import org.apache.fury.config.CompatibleMode; +import org.apache.fury.config.Language; +import org.apache.seata.core.serializer.SerializerSecurityRegistry; + +public class FurySerializerFactory { + private static final FurySerializerFactory FACTORY = new FurySerializerFactory(); + + private static final ThreadSafeFury FURY = new ThreadLocalFury(classLoader -> { + Fury f = Fury.builder() + .withLanguage(Language.JAVA) + // In JAVA mode, classes cannot be registered by tag, and the different registration order between the server and the client will cause deserialization failure + // In XLANG cross-language mode has problems with Java class serialization, such as enum classes [https://github.com/apache/fury/issues/1644]. + .requireClassRegistration(false) + //enable reference tracking for shared/circular reference. + .withRefTracking(true) + .withClassLoader(classLoader) + .withCompatibleMode(CompatibleMode.COMPATIBLE) + .build(); + + // register allow class + f.getClassResolver().setClassChecker((classResolver,className) -> SerializerSecurityRegistry.getAllowClassPattern().contains(className)); + return f; + }); + + public static FurySerializerFactory getInstance() { + return FACTORY; + } + + public ThreadSafeFury get() { + return FURY; + } +} diff --git a/serializer/seata-serializer-fury/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer b/serializer/seata-serializer-fury/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer new file mode 100644 index 00000000000..0e125a30674 --- /dev/null +++ b/serializer/seata-serializer-fury/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.seata.serializer.fury.FurySerializer \ No newline at end of file diff --git a/serializer/seata-serializer-fury/src/test/java/org/apache/seata/serializer/fury/FurySerializerTest.java b/serializer/seata-serializer-fury/src/test/java/org/apache/seata/serializer/fury/FurySerializerTest.java new file mode 100644 index 00000000000..2f5aad11de2 --- /dev/null +++ b/serializer/seata-serializer-fury/src/test/java/org/apache/seata/serializer/fury/FurySerializerTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.fury; + +import org.apache.seata.core.exception.TransactionExceptionCode; +import org.apache.seata.core.model.BranchStatus; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.protocol.ResultCode; +import org.apache.seata.core.protocol.transaction.BranchCommitRequest; +import org.apache.seata.core.protocol.transaction.BranchCommitResponse; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class FurySerializerTest { + private static FurySerializer furySerializer; + + @BeforeAll + public static void before() { + furySerializer = new FurySerializer(); + } + + @Test + public void testBranchCommitRequest() { + + BranchCommitRequest branchCommitRequest = new BranchCommitRequest(); + branchCommitRequest.setBranchType(BranchType.AT); + branchCommitRequest.setXid("xid"); + branchCommitRequest.setResourceId("resourceId"); + branchCommitRequest.setBranchId(20190809); + branchCommitRequest.setApplicationData("app"); + + byte[] bytes = furySerializer.serialize(branchCommitRequest); + BranchCommitRequest t = furySerializer.deserialize(bytes); + + assertThat(t.getTypeCode()).isEqualTo(branchCommitRequest.getTypeCode()); + assertThat(t.getBranchType()).isEqualTo(branchCommitRequest.getBranchType()); + assertThat(t.getXid()).isEqualTo(branchCommitRequest.getXid()); + assertThat(t.getResourceId()).isEqualTo(branchCommitRequest.getResourceId()); + assertThat(t.getBranchId()).isEqualTo(branchCommitRequest.getBranchId()); + assertThat(t.getApplicationData()).isEqualTo(branchCommitRequest.getApplicationData()); + + } + + @Test + public void testBranchCommitResponse() { + + BranchCommitResponse branchCommitResponse = new BranchCommitResponse(); + branchCommitResponse.setTransactionExceptionCode(TransactionExceptionCode.BranchTransactionNotExist); + branchCommitResponse.setBranchId(20190809); + branchCommitResponse.setBranchStatus(BranchStatus.PhaseOne_Done); + branchCommitResponse.setMsg("20190809"); + branchCommitResponse.setXid("20190809"); + branchCommitResponse.setResultCode(ResultCode.Failed); + + byte[] bytes = furySerializer.serialize(branchCommitResponse); + BranchCommitResponse t = furySerializer.deserialize(bytes); + + assertThat(t.getTransactionExceptionCode()).isEqualTo(branchCommitResponse.getTransactionExceptionCode()); + assertThat(t.getBranchId()).isEqualTo(branchCommitResponse.getBranchId()); + assertThat(t.getBranchStatus()).isEqualTo(branchCommitResponse.getBranchStatus()); + assertThat(t.getMsg()).isEqualTo(branchCommitResponse.getMsg()); + assertThat(t.getResultCode()).isEqualTo(branchCommitResponse.getResultCode()); + + } +}