diff --git a/bin/package.sh b/bin/package.sh index 05f134cb..9f739aec 100755 --- a/bin/package.sh +++ b/bin/package.sh @@ -28,6 +28,7 @@ cp ./repeater-logback.xml ${REPEATER_TARGET_DIR}/cfg/repeater-logback.xml \ && cp ../repeater-plugins/mybatis-plugin/target/mybatis-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/mybatis-plugin.jar \ && cp ../repeater-plugins/dubbo-plugin/target/dubbo-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/dubbo-plugin.jar \ && cp ../repeater-plugins/redis-plugin/target/redis-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/redis-plugin.jar \ + && cp ../repeater-plugins/rocketmq-plugin/target/rocketmq-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/rocketmq-plugin.jar \ && cp ../repeater-plugins/http-plugin/target/http-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/http-plugin.jar \ && cp ../repeater-plugins/hibernate-plugin/target/hibernate-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/hibernate-plugin.jar \ && cp ../repeater-plugins/spring-data-jpa-plugin/target/spring-data-jpa-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/spring-data-jpa-plugin.jar diff --git a/repeater-plugin-api/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/domain/InvokeType.java b/repeater-plugin-api/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/domain/InvokeType.java index 8e021ce6..e2cc0526 100644 --- a/repeater-plugin-api/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/domain/InvokeType.java +++ b/repeater-plugin-api/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/domain/InvokeType.java @@ -21,6 +21,9 @@ public class InvokeType implements java.io.Serializable { public static InvokeType DUBBO = new InvokeType("dubbo"); + + public static InvokeType ROCKETMQ = new InvokeType("rocketmq"); + public static InvokeType HIBERNATE = new InvokeType("hibernate"); public static InvokeType JPA = new InvokeType("jpa"); diff --git a/repeater-plugin-api/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/domain/RocketInvocation.java b/repeater-plugin-api/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/domain/RocketInvocation.java new file mode 100644 index 00000000..306fc00f --- /dev/null +++ b/repeater-plugin-api/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/domain/RocketInvocation.java @@ -0,0 +1,61 @@ +package com.alibaba.jvm.sandbox.repeater.plugin.domain; + +/** + * @author deipss + * @since 1.0.0 + */ +public class RocketInvocation extends Invocation{ + + /** + * 消息ID + */ + private String messageId; + + /** + * 消息主題 + */ + private String messageTopic; + + /** + * 消息tags + */ + private String messageTags; + + /** + * 消息 + */ + private String messageBody; + + + public String getMessageId() { + return messageId; + } + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + public String getMessageTopic() { + return messageTopic; + } + + public void setMessageTopic(String messageTopic) { + this.messageTopic = messageTopic; + } + + public String getMessageTags() { + return messageTags; + } + + public void setMessageTags(String messageTags) { + this.messageTags = messageTags; + } + + public String getMessageBody() { + return messageBody; + } + + public void setMessageBody(String messageBody) { + this.messageBody = messageBody; + } +} diff --git a/repeater-plugins/pom.xml b/repeater-plugins/pom.xml index d87c2bdb..67f51a5a 100644 --- a/repeater-plugins/pom.xml +++ b/repeater-plugins/pom.xml @@ -25,6 +25,7 @@ eh-cache-plugin guava-cache-plugin okhttp-plugin + rocketmq-plugin diff --git a/repeater-plugins/rocketmq-plugin/README.md b/repeater-plugins/rocketmq-plugin/README.md new file mode 100644 index 00000000..af8eed57 --- /dev/null +++ b/repeater-plugins/rocketmq-plugin/README.md @@ -0,0 +1,23 @@ +# rocketmq插件 + +基于Apache rocketmq Client 库实现的rocketmq插件,能够录制发送方和消費方的消息体数据 + + +## 详细设计 + +### 消息发送方 +> 对org.apache.rocketmq.client.producer.DefaultMQProducer的send方法进行插装增强, + + +### 消息消费方 +> 消息消费时,一般两种方式MessageListenerConcurrently和MessageListenerOrderly,都是一个consumeMessage()方法,因此对consumeMessage方法进行增强,获取org.apache.rocketmq.common.message.MessageExt对象中的消息体数据 + - 注意的是,一般消息消费时,会是一次调用的入口 +```java +@Override + public boolean isEntrance() { + return true; + } +``` +## 测试验证 + +**rocketmq:4.2.0** 版本,可以正常录制消費方和发送方的数据 diff --git a/repeater-plugins/rocketmq-plugin/pom.xml b/repeater-plugins/rocketmq-plugin/pom.xml new file mode 100644 index 00000000..0e30659b --- /dev/null +++ b/repeater-plugins/rocketmq-plugin/pom.xml @@ -0,0 +1,36 @@ + + + + repeater-plugins + com.alibaba.jvm.sandbox + 1.0.0-SNAPSHOT + + 4.0.0 + + rocketmq-plugin + + + ${project.name}-${project.version} + + + org.apache.maven.plugins + maven-assembly-plugin + + + + attached + + package + + + jar-with-dependencies + + + + + + + + \ No newline at end of file diff --git a/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqListenerEventListener.java b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqListenerEventListener.java new file mode 100644 index 00000000..4aeb50ef --- /dev/null +++ b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqListenerEventListener.java @@ -0,0 +1,37 @@ +package com.alibaba.jvm.sandbox.repeater.plugin.redis; + +import com.alibaba.jvm.sandbox.api.event.BeforeEvent; +import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationListener; +import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationProcessor; +import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.api.DefaultEventListener; +import com.alibaba.jvm.sandbox.repeater.plugin.domain.Invocation; +import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType; +import com.alibaba.jvm.sandbox.repeater.plugin.domain.RocketInvocation; +import org.apache.commons.lang3.reflect.MethodUtils; + +public class RocketmqListenerEventListener extends DefaultEventListener { + + public RocketmqListenerEventListener(InvokeType invokeType, boolean entrance, InvocationListener listener, InvocationProcessor processor) { + super(invokeType, entrance, listener, processor); + } + + @Override + protected Invocation initInvocation(BeforeEvent beforeEvent) { + RocketInvocation rocketInvocation = new RocketInvocation(); + try { + Object mqArg = beforeEvent.argumentArray[0]; + Object msgExt = MethodUtils.invokeMethod(mqArg, "get", 0); + String topic = String.valueOf(MethodUtils.invokeMethod(msgExt, "getTopic")); + String tags = String.valueOf(MethodUtils.invokeMethod(msgExt, "getTags")); + String msgId = String.valueOf(MethodUtils.invokeMethod(msgExt, "getMsgId")); + String body = new String((byte[]) MethodUtils.invokeMethod(msgExt, "getBody")); + rocketInvocation.setMessageTopic(topic); + rocketInvocation.setMessageTags(tags); + rocketInvocation.setMessageId(msgId); + rocketInvocation.setMessageBody(body); + } catch (Exception exception) { + log.error("init listener rocketInvocation error", exception); + } + return rocketInvocation; + } +} diff --git a/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqListenerInvocationProcessor.java b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqListenerInvocationProcessor.java new file mode 100644 index 00000000..9abeb28b --- /dev/null +++ b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqListenerInvocationProcessor.java @@ -0,0 +1,19 @@ +package com.alibaba.jvm.sandbox.repeater.plugin.redis; + +import com.alibaba.jvm.sandbox.api.event.BeforeEvent; +import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.api.DefaultInvocationProcessor; +import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType; + +public class RocketmqListenerInvocationProcessor extends DefaultInvocationProcessor { + public RocketmqListenerInvocationProcessor(InvokeType type) { + super(type); + } + + @Override + public Object[] assembleRequest(BeforeEvent event) { + if (event.argumentArray != null && event.argumentArray.length > 0) { + return new Object[]{event.argumentArray[0]}; + } + return event.argumentArray; + } +} diff --git a/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqListenerPlugin.java b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqListenerPlugin.java new file mode 100644 index 00000000..e41f4e26 --- /dev/null +++ b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqListenerPlugin.java @@ -0,0 +1,54 @@ +package com.alibaba.jvm.sandbox.repeater.plugin.redis; + +import com.alibaba.jvm.sandbox.api.event.Event; +import com.alibaba.jvm.sandbox.api.listener.EventListener; +import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationListener; +import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationProcessor; +import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.AbstractInvokePluginAdapter; +import com.alibaba.jvm.sandbox.repeater.plugin.core.model.EnhanceModel; +import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType; +import com.google.common.collect.Lists; + +import java.util.List; + +public class RocketmqListenerPlugin extends AbstractInvokePluginAdapter { + @Override + public InvokeType getType() { + return InvokeType.ROCKETMQ; + } + + @Override + public String identity() { + return "rocketmq-consumer"; + } + + @Override + public boolean isEntrance() { + return true; + } + + @Override + protected List getEnhanceModels() { + EnhanceModel messageListenerConcurrently = EnhanceModel.builder().classPattern("org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently") + .methodPatterns(EnhanceModel.MethodPattern.transform("consumeMessage")) + .includeSubClasses(true) + .watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS) + .build(); + EnhanceModel messageListenerOrderly = EnhanceModel.builder().classPattern("org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly") + .includeSubClasses(true) + .methodPatterns(EnhanceModel.MethodPattern.transform("consumeMessage")) + .watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS) + .build(); + return Lists.newArrayList(messageListenerConcurrently, messageListenerOrderly); + } + + @Override + protected InvocationProcessor getInvocationProcessor() { + return new RocketmqListenerInvocationProcessor(getType()); + } + + @Override + protected EventListener getEventListener(InvocationListener listener) { + return new RocketmqListenerEventListener(getType(), isEntrance(), listener, getInvocationProcessor()); + } +} diff --git a/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqProducerEventListener.java b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqProducerEventListener.java new file mode 100644 index 00000000..13ddc2ce --- /dev/null +++ b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqProducerEventListener.java @@ -0,0 +1,34 @@ +package com.alibaba.jvm.sandbox.repeater.plugin.redis; + +import com.alibaba.jvm.sandbox.api.event.BeforeEvent; +import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationListener; +import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationProcessor; +import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.api.DefaultEventListener; +import com.alibaba.jvm.sandbox.repeater.plugin.domain.Invocation; +import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType; +import com.alibaba.jvm.sandbox.repeater.plugin.domain.RocketInvocation; +import org.apache.commons.lang3.reflect.MethodUtils; + +public class RocketmqProducerEventListener extends DefaultEventListener { + + public RocketmqProducerEventListener(InvokeType invokeType, boolean entrance, InvocationListener listener, InvocationProcessor processor) { + super(invokeType, entrance, listener, processor); + } + + @Override + protected Invocation initInvocation(BeforeEvent beforeEvent) { + RocketInvocation rocketInvocation = new RocketInvocation(); + Object mqArg = beforeEvent.argumentArray[0]; + try { + String topic = String.valueOf(MethodUtils.invokeMethod(mqArg, "getTopic")); + String tags = String.valueOf(MethodUtils.invokeMethod(mqArg, "getTags")); + String body = new String((byte[]) MethodUtils.invokeMethod(mqArg, "getBody")); + rocketInvocation.setMessageTopic(topic); + rocketInvocation.setMessageTags(tags); + rocketInvocation.setMessageBody(body); + } catch (Exception exception) { + log.error("init producer rocketInvocation error", exception); + } + return rocketInvocation; + } +} diff --git a/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqProducerInvocationProcessor.java b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqProducerInvocationProcessor.java new file mode 100644 index 00000000..885f3a58 --- /dev/null +++ b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqProducerInvocationProcessor.java @@ -0,0 +1,19 @@ +package com.alibaba.jvm.sandbox.repeater.plugin.redis; + +import com.alibaba.jvm.sandbox.api.event.BeforeEvent; +import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.api.DefaultInvocationProcessor; +import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType; + +public class RocketmqProducerInvocationProcessor extends DefaultInvocationProcessor { + public RocketmqProducerInvocationProcessor(InvokeType type) { + super(type); + } + + @Override + public Object[] assembleRequest(BeforeEvent event) { + if (event.argumentArray != null && event.argumentArray.length > 0) { + return new Object[]{event.argumentArray[0]}; + } + return event.argumentArray; + } +} diff --git a/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqProducerPlugin.java b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqProducerPlugin.java new file mode 100644 index 00000000..dbe9e829 --- /dev/null +++ b/repeater-plugins/rocketmq-plugin/src/main/java/com/alibaba/jvm/sandbox/repeater/plugin/redis/RocketmqProducerPlugin.java @@ -0,0 +1,50 @@ +package com.alibaba.jvm.sandbox.repeater.plugin.redis; + +import com.alibaba.jvm.sandbox.api.event.Event; +import com.alibaba.jvm.sandbox.api.listener.EventListener; +import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationListener; +import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationProcessor; +import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.AbstractInvokePluginAdapter; +import com.alibaba.jvm.sandbox.repeater.plugin.core.model.EnhanceModel; +import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType; +import com.google.common.collect.Lists; + +import java.util.List; + +public class RocketmqProducerPlugin extends AbstractInvokePluginAdapter { + @Override + public InvokeType getType() { + return InvokeType.ROCKETMQ; + } + + @Override + public String identity() { + return "rocketmq-producer"; + } + + @Override + public boolean isEntrance() { + return false; + } + + @Override + protected List getEnhanceModels() { + EnhanceModel producer = EnhanceModel.builder().classPattern("org.apache.rocketmq.client.producer.DefaultMQProducer") + .methodPatterns(EnhanceModel.MethodPattern.transform("send")) + .includeSubClasses(true) + .watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS) + .build(); + + return Lists.newArrayList(producer); + } + + @Override + protected InvocationProcessor getInvocationProcessor() { + return new RocketmqProducerInvocationProcessor(getType()); + } + + @Override + protected EventListener getEventListener(InvocationListener listener) { + return new RocketmqProducerEventListener(getType(), isEntrance(), listener, getInvocationProcessor()); + } +}