From 02b96df4a11b211caa94c356b6aaab3ab891101e Mon Sep 17 00:00:00 2001 From: songxinjianqwe Date: Sat, 25 Aug 2018 00:23:08 +0800 Subject: [PATCH] =?UTF-8?q?Refactor:Javassist=E5=8A=A8=E6=80=81=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E9=87=8D=E6=9E=84=E4=B8=BA=E5=8F=AF=E4=BB=A5=E6=8F=90?= =?UTF-8?q?=E5=8D=87=E6=95=88=E7=8E=87=E7=9A=84=E5=AE=9E=E7=8E=B0=EF=BC=8C?= =?UTF-8?q?=E4=BD=86=E7=9B=AE=E5=89=8D=E6=9C=89bug=EF=BC=8C=E4=B8=8D?= =?UTF-8?q?=E5=8F=AF=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../benchmark/base/client/AbstractClient.java | 30 +--- .../src/main/resources/application.properties | 2 +- .../src/main/resources/application.properties | 2 +- .../toy/common/domain/RPCRequest.java | 42 +++++- .../toy/common/enumeration/ErrorEnum.java | 2 +- .../common/enumeration/InvocationType.java | 29 +++- .../sinjinsong/toy/common/util/TypeUtil.java | 32 ++++ .../toy/invocation/api/Invocation.java | 10 +- .../api/support/AbstractInvocation.java | 44 ++---- .../toy/invocation/async/AsyncInvocation.java | 10 +- .../callback/CallbackInvocation.java | 24 +-- .../invocation/oneway/OneWayInvocation.java | 10 +- .../toy/invocation/sync/SyncInvocation.java | 11 +- .../protocol/api/support/AbstractInvoker.java | 30 +--- .../toy/proxy/JavassistRPCProxyFactory.java | 139 +++++++++++++++--- .../toy/proxy/JdkRPCProxyFactory.java | 5 +- .../api/support/AbstractRPCProxyFactory.java | 60 ++++---- .../transport/api/support/RPCTaskRunner.java | 16 +- 18 files changed, 314 insertions(+), 184 deletions(-) create mode 100644 toy-rpc-core/src/main/java/com/sinjinsong/toy/common/util/TypeUtil.java diff --git a/benchmark/benchmark-base/src/main/java/com/sinjinsong/benchmark/base/client/AbstractClient.java b/benchmark/benchmark-base/src/main/java/com/sinjinsong/benchmark/base/client/AbstractClient.java index 2966fa7..d75e060 100644 --- a/benchmark/benchmark-base/src/main/java/com/sinjinsong/benchmark/base/client/AbstractClient.java +++ b/benchmark/benchmark-base/src/main/java/com/sinjinsong/benchmark/base/client/AbstractClient.java @@ -39,32 +39,6 @@ public abstract class AbstractClient { protected abstract UserService getUserService(); -// ReferenceConfig config = ReferenceConfig.createReferenceConfig( -// BenchmarkService.class.getName(), -// BenchmarkService.class, -// false, -// false, -// false, -// 3000, -// "", -// 1, -// false, -// null -// ); -// AbstractRPCBeanPostProcessor.initConfig(ctx, config); -// List clients = new ArrayList<>(threads); -// for (int i = 0; i < threads; i++) { -// clients.add(config.getForBenchmark()); -// } -// List tasks = new ArrayList<>(); -// TestObject testObject = new TestObject(); -// for (int i = 0; i < threads; i++) { -// BenchmarkService client = clients.get(i); -// -// -// tasks.add(r); -// } - public void run(int threads, int requestsTotal, int measurementIterations) { this.threads = threads; this.requestsTotal = requestsTotal; @@ -72,10 +46,10 @@ public void run(int threads, int requestsTotal, int measurementIterations) { this.userService = getUserService(); this.executorService = Executors.newFixedThreadPool(threads); this.measurementIterations = measurementIterations; - createUser(); +// createUser(); // existUser(); // getUser(); -// listUser(); + listUser(); } @Data diff --git a/benchmark/toy-rpc-client/src/main/resources/application.properties b/benchmark/toy-rpc-client/src/main/resources/application.properties index c1db4c2..513e601 100644 --- a/benchmark/toy-rpc-client/src/main/resources/application.properties +++ b/benchmark/toy-rpc-client/src/main/resources/application.properties @@ -1,6 +1,6 @@ rpc.application.name=benchmark_consumer rpc.application.serialize=protostuff -rpc.application.proxy=jdk +rpc.application.proxy=javassist rpc.protocol.type=toy rpc.protocol.executor.server.type=threadpool rpc.protocol.executor.client.type=threadpool diff --git a/benchmark/toy-rpc-server/src/main/resources/application.properties b/benchmark/toy-rpc-server/src/main/resources/application.properties index 8c64e3d..5bfd44d 100644 --- a/benchmark/toy-rpc-server/src/main/resources/application.properties +++ b/benchmark/toy-rpc-server/src/main/resources/application.properties @@ -1,6 +1,6 @@ rpc.application.name=benchmark_provider rpc.application.serialize=protostuff -rpc.application.proxy=jdk +rpc.application.proxy=javassist rpc.protocol.type=toy rpc.protocol.port=8000 rpc.protocol.executor.server.type=threadpool diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/domain/RPCRequest.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/domain/RPCRequest.java index 6c0a4d4..d7c65cd 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/domain/RPCRequest.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/domain/RPCRequest.java @@ -1,5 +1,6 @@ package com.sinjinsong.toy.common.domain; +import com.sinjinsong.toy.common.util.TypeUtil; import io.netty.util.Recycler; import lombok.Data; @@ -8,15 +9,14 @@ /** * Created by SinjinSong on 2017/7/30. - * + *

* 几种使用场景: * 1)consumer发起请求时创建,将数据序列化传输到服务器后被回收(encoder)/injvm调用服务完毕后被回收 * 2)provider收到请求时创建,服务调用完毕后被回收(反序列化直接获得对象,如果是protostuff可以reuse,其他不可reuse,可以recycle) - * + *

* callback: * 1)provider进行服务调用时创建,数据序列化传输到客户端后被回收(encoder) * 2)consumer收到callback请求时被创建,callback调用完毕后被回收(反序列化直接获得对象,如果是protostuff可以reuse,其他不可reuse,可以recycle) - * */ @Data public class RPCRequest implements Serializable { @@ -24,8 +24,40 @@ public class RPCRequest implements Serializable { private String requestId; private String interfaceName; private String methodName; - private Class[] parameterTypes; + private String[] parameterTypes; private Object[] parameters; + private transient Class[] parameterTypeClasses; + + public void setParameterTypes(Class[] parameterTypes) { + this.parameterTypeClasses = parameterTypes; + String[] paramTypes = new String[parameterTypes.length]; + for (int i = 0; i < parameterTypes.length; i++) { + paramTypes[i] = parameterTypes[i].getName(); + } + this.parameterTypes = paramTypes; + } + + public void setParameterTypes(String[] parameterTypes) { + this.parameterTypes = parameterTypes; + } + + public Class[] getParameterTypes() { + if (parameterTypeClasses == null) { + parameterTypeClasses = new Class[parameterTypes.length]; + for (int i = 0; i < parameterTypes.length; i++) { + if (TypeUtil.isPrimitive(parameterTypes[i])) { + parameterTypeClasses[i] = TypeUtil.map(parameterTypes[i]); + }else { + try { + parameterTypeClasses[i] = Class.forName(parameterTypes[i]); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + } + } + return parameterTypeClasses; + } public RPCRequest(Recycler.Handle handle) { this.handle = handle; @@ -40,7 +72,7 @@ public String key() { .append(".") .append(Arrays.toString(parameters)).toString(); } - + public void recycle() { requestId = null; interfaceName = null; diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/enumeration/ErrorEnum.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/enumeration/ErrorEnum.java index e6ff464..1336113 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/enumeration/ErrorEnum.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/enumeration/ErrorEnum.java @@ -28,7 +28,7 @@ public enum ErrorEnum { PROTOCOL_CANNOT_FIND_THE_SERVER_ADDRESS("协议找不到该服务器地址"), HEART_BEAT_TIME_OUT_EXCEED("超过心跳超时时间"), CREATE_PROXY_ERROR("生成代理失败"), - RECYCLER_ERROR("对象复用失败") + RECYCLER_ERROR("对象复用失败"), ; private String errorCode; diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/enumeration/InvocationType.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/enumeration/InvocationType.java index d724d55..443f50c 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/enumeration/InvocationType.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/enumeration/InvocationType.java @@ -1,9 +1,36 @@ package com.sinjinsong.toy.common.enumeration; +import com.sinjinsong.toy.common.util.InvokeParamUtil; +import com.sinjinsong.toy.config.ReferenceConfig; +import com.sinjinsong.toy.invocation.api.Invocation; +import com.sinjinsong.toy.invocation.async.AsyncInvocation; +import com.sinjinsong.toy.invocation.callback.CallbackInvocation; +import com.sinjinsong.toy.invocation.oneway.OneWayInvocation; +import com.sinjinsong.toy.invocation.sync.SyncInvocation; +import com.sinjinsong.toy.protocol.api.InvokeParam; + /** * @author sinjinsong * @date 2018/7/15 */ public enum InvocationType { - ONEWAY,SYNC,ASYNC,CALLBACK; + ONEWAY(new OneWayInvocation()),SYNC(new SyncInvocation()),ASYNC(new AsyncInvocation()),CALLBACK(new CallbackInvocation()); + private Invocation invocation; + + InvocationType(Invocation invocation) { + this.invocation = invocation; + } + + public static Invocation get(InvokeParam invokeParam) { + ReferenceConfig referenceConfig = InvokeParamUtil.extractReferenceConfigFromInvokeParam(invokeParam); + if (referenceConfig.isAsync()) { + return ASYNC.invocation; + } else if (referenceConfig.isCallback()) { + return CALLBACK.invocation; + } else if (referenceConfig.isOneWay()) { + return ONEWAY.invocation; + } else { + return SYNC.invocation; + } + } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/util/TypeUtil.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/util/TypeUtil.java new file mode 100644 index 0000000..5662822 --- /dev/null +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/common/util/TypeUtil.java @@ -0,0 +1,32 @@ +package com.sinjinsong.toy.common.util; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author sinjinsong + * @date 2018/8/24 + */ +public class TypeUtil { + private static Map CLASS_MAP = new HashMap<>(); + + static { + CLASS_MAP.put("int", int.class); + CLASS_MAP.put("float", float.class); + CLASS_MAP.put("double", double.class); + CLASS_MAP.put("boolean", boolean.class); + CLASS_MAP.put("byte", byte.class); + CLASS_MAP.put("char", char.class); + CLASS_MAP.put("short", short.class); + CLASS_MAP.put("long", long.class); + } + + public static boolean isPrimitive(String type) { + return CLASS_MAP.containsKey(type); + } + + + public static Class map(String type) { + return CLASS_MAP.get(type); + } +} diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/api/Invocation.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/api/Invocation.java index 18c793e..62a1858 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/api/Invocation.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/api/Invocation.java @@ -1,13 +1,17 @@ package com.sinjinsong.toy.invocation.api; -import com.sinjinsong.toy.common.exception.RPCException; +import com.sinjinsong.toy.common.domain.RPCRequest; import com.sinjinsong.toy.common.domain.RPCResponse; +import com.sinjinsong.toy.common.exception.RPCException; +import com.sinjinsong.toy.protocol.api.InvokeParam; + +import java.util.concurrent.Future; +import java.util.function.Function; /** * @author sinjinsong * @date 2018/7/7 */ public interface Invocation { - RPCResponse invoke() throws RPCException; - + RPCResponse invoke(InvokeParam invokeParam, Function> requestProcessor) throws RPCException; } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/api/support/AbstractInvocation.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/api/support/AbstractInvocation.java index c067066..1be02a8 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/api/support/AbstractInvocation.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/api/support/AbstractInvocation.java @@ -1,12 +1,14 @@ package com.sinjinsong.toy.invocation.api.support; +import com.sinjinsong.toy.common.domain.RPCRequest; +import com.sinjinsong.toy.common.domain.RPCResponse; import com.sinjinsong.toy.common.enumeration.ErrorEnum; import com.sinjinsong.toy.common.exception.RPCException; +import com.sinjinsong.toy.common.util.InvokeParamUtil; import com.sinjinsong.toy.config.ReferenceConfig; import com.sinjinsong.toy.invocation.api.Invocation; -import com.sinjinsong.toy.common.domain.RPCRequest; -import com.sinjinsong.toy.common.domain.RPCResponse; +import com.sinjinsong.toy.protocol.api.InvokeParam; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Future; @@ -18,35 +20,17 @@ */ @Slf4j public abstract class AbstractInvocation implements Invocation { - private ReferenceConfig referenceConfig; - private RPCRequest rpcRequest; - private Function> processor; - - - public final void setReferenceConfig(ReferenceConfig referenceConfig) { - this.referenceConfig = referenceConfig; - } - - public final void setRpcRequest(RPCRequest rpcRequest) { - this.rpcRequest = rpcRequest; - } - - public final void setProcessor(Function> processor){ - this.processor = processor; - } - protected final Future doCustomProcess() { - return processor.apply(rpcRequest); - } - @Override - public final RPCResponse invoke() throws RPCException { + public final RPCResponse invoke(InvokeParam invokeParam, Function> requestProcessor) throws RPCException { RPCResponse response; + ReferenceConfig referenceConfig = InvokeParamUtil.extractReferenceConfigFromInvokeParam(invokeParam); + RPCRequest rpcRequest = InvokeParamUtil.extractRequestFromInvokeParam(invokeParam); try { - response = doInvoke(); + response = doInvoke(rpcRequest, referenceConfig,requestProcessor); } catch (Throwable e) { e.printStackTrace(); - throw new RPCException(e,ErrorEnum.TRANSPORT_FAILURE, "transport异常"); + throw new RPCException(e, ErrorEnum.TRANSPORT_FAILURE, "transport异常"); } return response; } @@ -57,13 +41,5 @@ public final RPCResponse invoke() throws RPCException { * @return * @throws Throwable */ - protected abstract RPCResponse doInvoke() throws Throwable; - - public final ReferenceConfig getReferenceConfig() { - return referenceConfig; - } - - public final RPCRequest getRpcRequest() { - return rpcRequest; - } + protected abstract RPCResponse doInvoke(RPCRequest rpcRequest, ReferenceConfig referenceConfig,Function> requestProcessor) throws Throwable; } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/async/AsyncInvocation.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/async/AsyncInvocation.java index 090f7c7..7e7a3b2 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/async/AsyncInvocation.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/async/AsyncInvocation.java @@ -1,6 +1,8 @@ package com.sinjinsong.toy.invocation.async; import com.sinjinsong.toy.common.context.RPCThreadLocalContext; +import com.sinjinsong.toy.common.domain.RPCRequest; +import com.sinjinsong.toy.config.ReferenceConfig; import com.sinjinsong.toy.invocation.api.support.AbstractInvocation; import com.sinjinsong.toy.common.domain.RPCResponse; @@ -8,16 +10,17 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; /** * @author sinjinsong * @date 2018/6/10 */ public class AsyncInvocation extends AbstractInvocation { - + @Override - protected RPCResponse doInvoke() throws Throwable { - Future future = doCustomProcess(); + protected RPCResponse doInvoke(RPCRequest rpcRequest, ReferenceConfig referenceConfig, Function> requestProcessor) throws Throwable { + Future future = requestProcessor.apply(rpcRequest); Future responseForUser = new Future() { @Override public boolean cancel(boolean mayInterruptIfRunning) { @@ -57,5 +60,4 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec RPCThreadLocalContext.getContext().setFuture(responseForUser); return null; } - } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/callback/CallbackInvocation.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/callback/CallbackInvocation.java index cc42cb4..1d30a01 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/callback/CallbackInvocation.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/callback/CallbackInvocation.java @@ -8,6 +8,9 @@ import com.sinjinsong.toy.common.domain.RPCRequest; import com.sinjinsong.toy.common.domain.RPCResponse; +import java.util.concurrent.Future; +import java.util.function.Function; + /** * @author sinjinsong * @date 2018/6/10 @@ -19,24 +22,23 @@ * 简言之就是客户端RPC服务器,服务器RPC客户端。 * 这里约定,客户端rpc服务器,服务器不会影响该request;服务器转而会rpc服务器,两个request的id是一样的。 * 通过这个相同的requestid来定位callback实例。 - * + *

* 注意!如果调用失败,则callback不会被调用 */ -public class CallbackInvocation extends AbstractInvocation { +public class CallbackInvocation extends AbstractInvocation { + @Override - protected RPCResponse doInvoke() throws Throwable { - RPCRequest rpcRequest = getRpcRequest(); - ReferenceConfig referenceConfig = getReferenceConfig(); + protected RPCResponse doInvoke(RPCRequest rpcRequest, ReferenceConfig referenceConfig, Function> requestProcessor) throws Throwable { Object callbackInstance = rpcRequest.getParameters()[referenceConfig.getCallbackParamIndex()]; // 该实例无需序列化 rpcRequest.getParameters()[referenceConfig.getCallbackParamIndex()] = null; - registerCallbackHandler(rpcRequest, callbackInstance); - doCustomProcess(); + registerCallbackHandler(rpcRequest, callbackInstance,referenceConfig); + requestProcessor.apply(rpcRequest); return null; } - - private void registerCallbackHandler(RPCRequest request, Object callbackInstance) { + + private void registerCallbackHandler(RPCRequest request, Object callbackInstance,ReferenceConfig referenceConfig) { Class interfaceClass = callbackInstance.getClass().getInterfaces()[0]; ServiceConfig config = ServiceConfig.builder() @@ -44,7 +46,7 @@ private void registerCallbackHandler(RPCRequest request, Object callbackInstance .interfaceClass((Class) interfaceClass) .isCallbackInterface(true) .ref(callbackInstance).build(); - RPCThreadSharedContext.registerHandler(generateCallbackHandlerKey(request, getReferenceConfig()), + RPCThreadSharedContext.registerHandler(generateCallbackHandlerKey(request, referenceConfig), config); } @@ -56,4 +58,6 @@ public static String generateCallbackHandlerKey(RPCRequest request) { private static String generateCallbackHandlerKey(RPCRequest request, ReferenceConfig referenceConfig) { return new StringBuilder(request.getRequestId()).append(".").append(request.getParameterTypes()[referenceConfig.getCallbackParamIndex()].getName()).toString(); } + + } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/oneway/OneWayInvocation.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/oneway/OneWayInvocation.java index e0b885b..b5a59c3 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/oneway/OneWayInvocation.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/oneway/OneWayInvocation.java @@ -1,16 +1,22 @@ package com.sinjinsong.toy.invocation.oneway; +import com.sinjinsong.toy.common.domain.RPCRequest; +import com.sinjinsong.toy.config.ReferenceConfig; import com.sinjinsong.toy.invocation.api.support.AbstractInvocation; import com.sinjinsong.toy.common.domain.RPCResponse; +import java.util.concurrent.Future; +import java.util.function.Function; + /** * @author sinjinsong * @date 2018/6/10 */ public class OneWayInvocation extends AbstractInvocation { + @Override - protected RPCResponse doInvoke() throws Throwable { - doCustomProcess(); + protected RPCResponse doInvoke(RPCRequest rpcRequest, ReferenceConfig referenceConfig, Function> requestProcessor) throws Throwable { + requestProcessor.apply(rpcRequest); return null; } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/sync/SyncInvocation.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/sync/SyncInvocation.java index 2d0ce8f..f1b059c 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/sync/SyncInvocation.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/invocation/sync/SyncInvocation.java @@ -1,11 +1,14 @@ package com.sinjinsong.toy.invocation.sync; -import com.sinjinsong.toy.invocation.api.support.AbstractInvocation; +import com.sinjinsong.toy.common.domain.RPCRequest; import com.sinjinsong.toy.common.domain.RPCResponse; +import com.sinjinsong.toy.config.ReferenceConfig; +import com.sinjinsong.toy.invocation.api.support.AbstractInvocation; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Function; /** * @author sinjinsong @@ -15,9 +18,9 @@ public class SyncInvocation extends AbstractInvocation { @Override - protected RPCResponse doInvoke() throws Throwable { - Future future = doCustomProcess(); - RPCResponse response = future.get(getReferenceConfig().getTimeout(), TimeUnit.MILLISECONDS); + protected RPCResponse doInvoke(RPCRequest rpcRequest, ReferenceConfig referenceConfig, Function> requestProcessor) throws Throwable { + Future future = requestProcessor.apply(rpcRequest); + RPCResponse response = future.get(referenceConfig.getTimeout(), TimeUnit.MILLISECONDS); log.info("客户端读到响应:{}", response); return response; } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractInvoker.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractInvoker.java index 70b957d..c675aad 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractInvoker.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractInvoker.java @@ -1,21 +1,15 @@ package com.sinjinsong.toy.protocol.api.support; +import com.sinjinsong.toy.common.domain.RPCRequest; +import com.sinjinsong.toy.common.domain.RPCResponse; import com.sinjinsong.toy.common.enumeration.ErrorEnum; +import com.sinjinsong.toy.common.enumeration.InvocationType; import com.sinjinsong.toy.common.exception.RPCException; -import com.sinjinsong.toy.common.util.InvokeParamUtil; import com.sinjinsong.toy.config.GlobalConfig; -import com.sinjinsong.toy.config.ReferenceConfig; import com.sinjinsong.toy.filter.Filter; -import com.sinjinsong.toy.invocation.api.support.AbstractInvocation; -import com.sinjinsong.toy.invocation.async.AsyncInvocation; -import com.sinjinsong.toy.invocation.callback.CallbackInvocation; -import com.sinjinsong.toy.invocation.oneway.OneWayInvocation; -import com.sinjinsong.toy.invocation.sync.SyncInvocation; import com.sinjinsong.toy.protocol.api.InvokeParam; import com.sinjinsong.toy.protocol.api.Invoker; import com.sinjinsong.toy.registry.api.ServiceURL; -import com.sinjinsong.toy.common.domain.RPCRequest; -import com.sinjinsong.toy.common.domain.RPCResponse; import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -40,23 +34,7 @@ public RPCResponse invoke(InvokeParam invokeParam) throws RPCException { // TODO 想办法在编译时检查 throw new RPCException(ErrorEnum.GET_PROCESSOR_MUST_BE_OVERRIDE_WHEN_INVOKE_DID_NOT_OVERRIDE, "没有重写AbstractInvoker#invoke方法的时候,必须重写getProcessor方法"); } - // 如果提交任务失败,则删掉该Endpoint,再次提交的话必须重新创建Endpoint - AbstractInvocation invocation; - ReferenceConfig referenceConfig = InvokeParamUtil.extractReferenceConfigFromInvokeParam(invokeParam); - RPCRequest rpcRequest = InvokeParamUtil.extractRequestFromInvokeParam(invokeParam); - if (referenceConfig.isAsync()) { - invocation = new AsyncInvocation(); - } else if (referenceConfig.isCallback()) { - invocation = new CallbackInvocation(); - } else if (referenceConfig.isOneWay()) { - invocation = new OneWayInvocation(); - } else { - invocation = new SyncInvocation(); - } - invocation.setReferenceConfig(referenceConfig); - invocation.setRpcRequest(rpcRequest); - invocation.setProcessor(logic); - return invocation.invoke(); + return InvocationType.get(invokeParam).invoke(invokeParam,logic); } /** diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/JavassistRPCProxyFactory.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/JavassistRPCProxyFactory.java index 4de6cfc..70be34e 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/JavassistRPCProxyFactory.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/JavassistRPCProxyFactory.java @@ -4,9 +4,8 @@ import com.sinjinsong.toy.common.exception.RPCException; import com.sinjinsong.toy.protocol.api.Invoker; import com.sinjinsong.toy.proxy.api.support.AbstractRPCProxyFactory; -import javassist.util.proxy.MethodHandler; -import javassist.util.proxy.ProxyFactory; -import javassist.util.proxy.ProxyObject; +import javassist.*; +import lombok.extern.slf4j.Slf4j; import java.lang.reflect.Method; @@ -14,32 +13,126 @@ * @author sinjinsong * @date 2018/8/23 */ +@Slf4j public class JavassistRPCProxyFactory extends AbstractRPCProxyFactory { + private CtClass invokerCtClass = new CtClass(Invoker.class.getName()) { + }; + + private CtClass interceptorCtClass = new CtClass(AbstractRPCProxyFactory.class.getName()) { + }; + + @Override protected T doCreateProxy(Class interfaceClass, Invoker invoker) { - //代理对象 - ProxyFactory factory = new ProxyFactory(); - - //设定需要代理的类 - factory.setInterfaces(new Class[]{interfaceClass}); - - //创建class - Class clazz = factory.createClass(); - //实例化对象 T t = null; try { - t = interfaceClass.cast(clazz.newInstance()); - } catch (Exception e) { - throw new RPCException(ErrorEnum.CREATE_PROXY_ERROR,"Javassist创建代理异常"); - } - //设置代理对象 - ((ProxyObject)t).setHandler(new MethodHandler() { - - @Override - public Object invoke(Object obj, Method method, Method process, Object[] args) throws Throwable { - return JavassistRPCProxyFactory.this.invokeProxyMethod(invoker,method,args); + String interfaceName = interfaceClass.getName(); + ClassPool pool = ClassPool.getDefault(); + // 传入类名,最后生成某种Interface + // 我们保证某个interface只会生成一个代理类 + CtClass proxyClass = pool.makeClass(interfaceName + "$proxyInvoker"); + // 设置接口类型 + proxyClass.setInterfaces(new CtClass[]{pool.getCtClass(interfaceName)}); + CtField invokerField = new CtField(this.invokerCtClass, "invoker", proxyClass); + invokerField.setModifiers(Modifier.PRIVATE | Modifier.FINAL); + proxyClass.addField(invokerField); + CtField interceptorField = new CtField(this.interceptorCtClass, "interceptor", proxyClass); + interceptorField.setModifiers(Modifier.PRIVATE | Modifier.FINAL); + proxyClass.addField(interceptorField); + + CtConstructor ctConstructor = new CtConstructor(new CtClass[]{this.invokerCtClass, this.interceptorCtClass}, proxyClass); + ctConstructor.setModifiers(Modifier.PUBLIC); + ctConstructor.setBody("{this.invoker=$1;this.interceptor=$2;}"); + proxyClass.addConstructor(ctConstructor); + Method[] methods = interfaceClass.getMethods(); + for (int i = 0; i < methods.length; i++) { + Method method = methods[i]; + addInterfaceMethod(interfaceName, proxyClass, method); } - }); + addCommonMethods(interfaceName, proxyClass); + t = interfaceClass.cast(proxyClass.toClass().getConstructor(Invoker.class, AbstractRPCProxyFactory.class).newInstance(invoker, this)); + } catch (Exception e) { + e.printStackTrace(); + throw new RPCException(ErrorEnum.CREATE_PROXY_ERROR, "生成Javassist动态代理失败", e); + } return t; } + + private void addCommonMethods(String interfaceName, CtClass proxyClass) { +// if (methodName.equals("toString") && parameterTypes.length == 0) { +// methodDeclare.append("return invokerCtClass.toString();}"); +// } else if ("hashCode".equals(method.getName()) && method.getParameterTypes().length == 0) { +// methodDeclare.append("return invokerCtClass.hashCode();}"); +// } else if ("equals".equals(method.getName()) && method.getParameterTypes().length == 1) { +// methodDeclare.append("invokerCtClass.equals(args[0]);}"); +// } else { + } + + private static void addInterfaceMethod(String classToProxy, CtClass implementer, Method method) throws CannotCompileException { + String methodCode = generateMethodCode(classToProxy, method); + CtMethod cm = CtNewMethod.make(methodCode, implementer); + implementer.addMethod(cm); + } + + private static String generateMethodCode(String interfaceName, Method method) { + String methodName = method.getName(); + String methodReturnType = method.getReturnType().getName(); + Class[] parameterTypes = method.getParameterTypes(); + Class[] exceptionTypes = method.getExceptionTypes(); + + //组装方法的Exception声明 + StringBuilder exceptionBuffer = new StringBuilder(); + if (exceptionTypes.length > 0) exceptionBuffer.append(" throws "); + for (int i = 0; i < exceptionTypes.length; i++) { + if (i != exceptionTypes.length - 1) { + exceptionBuffer.append(exceptionTypes[i].getName()).append(","); + } else { + exceptionBuffer.append(exceptionTypes[i].getName()); + } + } + + //组装方法的参数列表 + StringBuilder parameterBuffer = new StringBuilder(); + for (int i = 0; i < parameterTypes.length; i++) { + Class parameter = parameterTypes[i]; + String parameterType = parameter.getName(); + //动态指定方法参数的变量名 + String refName = "a" + i; + parameterBuffer.append(parameterType).append(" " + refName); + if (i != parameterTypes.length - 1) { + parameterBuffer.append(","); + } + } + + //方法声明,由于是实现接口的方法,所以是public + StringBuilder methodDeclare = new StringBuilder(); + methodDeclare.append("public ").append(methodReturnType).append(" ").append(methodName).append("(").append(parameterBuffer).append(")").append(exceptionBuffer).append(" {"); +// methodDeclare.append("System.out.println(a0);"); +// methodDeclare.append("System.out.println(new Object[]{a0});"); + // 方法体 + methodDeclare.append("return interceptor.invokeProxyMethod(") + .append("invoker").append(",") + .append("\"") + .append(interfaceName).append("\"") + .append(",") + .append("\"").append(methodName).append("\"") + .append(",") + .append("new String[]{"); + for (int i = 0; i < parameterTypes.length; i++) { + methodDeclare.append("\"").append(parameterTypes[i].getName()).append("\""); + } + methodDeclare.append("}"); + methodDeclare.append(","); + //传递方法里的参数 + methodDeclare.append("new Object[]{"); + for (int i = 0; i < parameterTypes.length; i++) { + methodDeclare.append("a").append(i); + if (i != parameterTypes.length - 1) { + methodDeclare.append(","); + } + } + methodDeclare.append("});}"); + System.out.println(methodDeclare.toString()); + return methodDeclare.toString(); + } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/JdkRPCProxyFactory.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/JdkRPCProxyFactory.java index bfc34db..e97069a 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/JdkRPCProxyFactory.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/JdkRPCProxyFactory.java @@ -19,14 +19,15 @@ public class JdkRPCProxyFactory extends AbstractRPCProxyFactory { protected T doCreateProxy(Class interfaceClass, Invoker invoker) { return (T) Proxy.newProxyInstance( invoker.getInterface().getClassLoader(), - new Class[]{invoker.getInterface()}, + new Class[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - return JdkRPCProxyFactory.this.invokeProxyMethod(invoker,method,args); + return JdkRPCProxyFactory.this.invokeProxyMethod(invoker, method, args); } } ); } + } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/api/support/AbstractRPCProxyFactory.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/api/support/AbstractRPCProxyFactory.java index a5fc74b..23872a5 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/api/support/AbstractRPCProxyFactory.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/proxy/api/support/AbstractRPCProxyFactory.java @@ -1,5 +1,8 @@ package com.sinjinsong.toy.proxy.api.support; +import com.sinjinsong.toy.common.domain.GlobalRecycler; +import com.sinjinsong.toy.common.domain.RPCRequest; +import com.sinjinsong.toy.common.domain.RPCResponse; import com.sinjinsong.toy.common.exception.RPCException; import com.sinjinsong.toy.config.ReferenceConfig; import com.sinjinsong.toy.protocol.api.InvokeParam; @@ -7,9 +10,6 @@ import com.sinjinsong.toy.protocol.api.support.AbstractInvoker; import com.sinjinsong.toy.protocol.api.support.RPCInvokeParam; import com.sinjinsong.toy.proxy.api.RPCProxyFactory; -import com.sinjinsong.toy.common.domain.GlobalRecycler; -import com.sinjinsong.toy.common.domain.RPCRequest; -import com.sinjinsong.toy.common.domain.RPCResponse; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.Method; @@ -25,44 +25,30 @@ public abstract class AbstractRPCProxyFactory implements RPCProxyFactory { protected Map, Object> cache = new ConcurrentHashMap<>(); - - @Override - public T createProxy(Invoker invoker) { - if (cache.containsKey(invoker.getInterface())) { - return (T) cache.get(invoker.getInterface()); - } - T t = doCreateProxy(invoker.getInterface(), invoker); - cache.put(invoker.getInterface(), t); - return t; - } - - protected abstract T doCreateProxy(Class interfaceClass, Invoker invoker); - - - protected Object invokeProxyMethod(Invoker invoker, Method method, Object[] args) { + public Object invokeProxyMethod(Invoker invoker, String interfaceName, String methodName, String[] parameterTypes, Object[] args) { // 创建并初始化 RPC 请求 - if ("toString".equals(method.getName()) && method.getParameterTypes().length == 0) { + if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } - if ("hashCode".equals(method.getName()) && method.getParameterTypes().length == 0) { + if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } - if ("equals".equals(method.getName()) && method.getParameterTypes().length == 1) { + if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 复用request RPCRequest request = GlobalRecycler.reuse(RPCRequest.class); - log.info("调用服务:{} {}", method.getDeclaringClass().getName(), method.getName()); + log.info("调用服务:{}#{},parameterTypes:{},args:{}", interfaceName, methodName,parameterTypes,args); request.setRequestId(UUID.randomUUID().toString()); - request.setInterfaceName(method.getDeclaringClass().getName()); - request.setMethodName(method.getName()); - request.setParameterTypes(method.getParameterTypes()); + request.setInterfaceName(interfaceName); + request.setMethodName(methodName); + request.setParameterTypes(parameterTypes); request.setParameters(args); // 通过 RPC 客户端发送 RPC 请求并获取 RPC 响应 // ClusterInvoker RPCInvokeParam invokeParam = RPCInvokeParam.builder() .rpcRequest(request) - .referenceConfig(ReferenceConfig.getReferenceConfigByInterfaceName(method.getDeclaringClass().getName())) + .referenceConfig(ReferenceConfig.getReferenceConfigByInterfaceName(interfaceName)) .build(); RPCResponse response = invoker.invoke(invokeParam); Object result = null; @@ -75,6 +61,28 @@ protected Object invokeProxyMethod(Invoker invoker, Method method, Object[] args return result; } + protected Object invokeProxyMethod(Invoker invoker, Method method, Object[] args) { + Class[] parameterTypes = method.getParameterTypes(); + String[] paramTypes = new String[parameterTypes.length]; + for (int i = 0; i < parameterTypes.length; i++) { + paramTypes[i] = parameterTypes[i].getName(); + }; + return invokeProxyMethod(invoker,method.getDeclaringClass().getName(),method.getName(),paramTypes,args); + } + + @Override + public T createProxy(Invoker invoker) { + if (cache.containsKey(invoker.getInterface())) { + return (T) cache.get(invoker.getInterface()); + } + T t = doCreateProxy(invoker.getInterface(), invoker); + cache.put(invoker.getInterface(), t); + return t; + } + + protected abstract T doCreateProxy(Class interfaceClass, Invoker invoker); + + @Override public Invoker getInvoker(T proxy, Class type) { return new AbstractInvoker() { diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/RPCTaskRunner.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/RPCTaskRunner.java index ef4f20c..8d74c7b 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/RPCTaskRunner.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/RPCTaskRunner.java @@ -1,16 +1,15 @@ package com.sinjinsong.toy.transport.api.support; -import com.sinjinsong.toy.config.ServiceConfig; -import com.sinjinsong.toy.transport.api.converter.MessageConverter; import com.sinjinsong.toy.common.domain.GlobalRecycler; import com.sinjinsong.toy.common.domain.Message; import com.sinjinsong.toy.common.domain.RPCRequest; import com.sinjinsong.toy.common.domain.RPCResponse; +import com.sinjinsong.toy.config.ServiceConfig; +import com.sinjinsong.toy.transport.api.converter.MessageConverter; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; @@ -112,15 +111,6 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl } ); } - try { - return method.invoke(serviceBean, parameters); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } catch (IllegalArgumentException e) { - e.printStackTrace(); - } catch (InvocationTargetException e) { - throw e.getCause(); - } - return null; + return method.invoke(serviceBean, parameters); } }