Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持Rocketmq 消息发送、消费的数据录制 #187

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/package.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ cp ./repeater-logback.xml ${REPEATER_TARGET_DIR}/cfg/repeater-logback.xml \
&& cp ../repeater-plugins/mybatis-plugin/target/mybatis-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/mybatis-plugin.jar \
&& cp ../repeater-plugins/dubbo-plugin/target/dubbo-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/dubbo-plugin.jar \
&& cp ../repeater-plugins/redis-plugin/target/redis-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/redis-plugin.jar \
&& cp ../repeater-plugins/rocketmq-plugin/target/rocketmq-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/rocketmq-plugin.jar \
&& cp ../repeater-plugins/http-plugin/target/http-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/http-plugin.jar \
&& cp ../repeater-plugins/hibernate-plugin/target/hibernate-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/hibernate-plugin.jar \
&& cp ../repeater-plugins/spring-data-jpa-plugin/target/spring-data-jpa-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/spring-data-jpa-plugin.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class InvokeType implements java.io.Serializable {

public static InvokeType DUBBO = new InvokeType("dubbo");


public static InvokeType ROCKETMQ = new InvokeType("rocketmq");

public static InvokeType HIBERNATE = new InvokeType("hibernate");

public static InvokeType JPA = new InvokeType("jpa");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.alibaba.jvm.sandbox.repeater.plugin.domain;

/**
* @author deipss
* @since 1.0.0
*/
public class RocketInvocation extends Invocation{

/**
* 消息ID
*/
private String messageId;

/**
* 消息主題
*/
private String messageTopic;

/**
* 消息tags
*/
private String messageTags;

/**
* 消息
*/
private String messageBody;


public String getMessageId() {
return messageId;
}

public void setMessageId(String messageId) {
this.messageId = messageId;
}

public String getMessageTopic() {
return messageTopic;
}

public void setMessageTopic(String messageTopic) {
this.messageTopic = messageTopic;
}

public String getMessageTags() {
return messageTags;
}

public void setMessageTags(String messageTags) {
this.messageTags = messageTags;
}

public String getMessageBody() {
return messageBody;
}

public void setMessageBody(String messageBody) {
this.messageBody = messageBody;
}
}
1 change: 1 addition & 0 deletions repeater-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<module>eh-cache-plugin</module>
<module>guava-cache-plugin</module>
<module>okhttp-plugin</module>
<module>rocketmq-plugin</module>
</modules>

<dependencies>
Expand Down
23 changes: 23 additions & 0 deletions repeater-plugins/rocketmq-plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# rocketmq插件

基于Apache rocketmq Client 库实现的rocketmq插件,能够录制发送方和消費方的消息体数据


## 详细设计

### 消息发送方
> 对org.apache.rocketmq.client.producer.DefaultMQProducer的send方法进行插装增强,


### 消息消费方
> 消息消费时,一般两种方式MessageListenerConcurrently和MessageListenerOrderly,都是一个consumeMessage()方法,因此对consumeMessage方法进行增强,获取org.apache.rocketmq.common.message.MessageExt对象中的消息体数据
- 注意的是,一般消息消费时,会是一次调用的入口
```java
@Override
public boolean isEntrance() {
return true;
}
```
## 测试验证

**rocketmq:4.2.0** 版本,可以正常录制消費方和发送方的数据
36 changes: 36 additions & 0 deletions repeater-plugins/rocketmq-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>repeater-plugins</artifactId>
<groupId>com.alibaba.jvm.sandbox</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rocketmq-plugin</artifactId>

<build>
<finalName>${project.name}-${project.version}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>attached</goal>
</goals>
<phase>package</phase>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.alibaba.jvm.sandbox.repeater.plugin.redis;

import com.alibaba.jvm.sandbox.api.event.BeforeEvent;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationListener;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationProcessor;
import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.api.DefaultEventListener;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.Invocation;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.RocketInvocation;
import org.apache.commons.lang3.reflect.MethodUtils;

public class RocketmqListenerEventListener extends DefaultEventListener {

public RocketmqListenerEventListener(InvokeType invokeType, boolean entrance, InvocationListener listener, InvocationProcessor processor) {
super(invokeType, entrance, listener, processor);
}

@Override
protected Invocation initInvocation(BeforeEvent beforeEvent) {
RocketInvocation rocketInvocation = new RocketInvocation();
try {
Object mqArg = beforeEvent.argumentArray[0];
Object msgExt = MethodUtils.invokeMethod(mqArg, "get", 0);
String topic = String.valueOf(MethodUtils.invokeMethod(msgExt, "getTopic"));
String tags = String.valueOf(MethodUtils.invokeMethod(msgExt, "getTags"));
String msgId = String.valueOf(MethodUtils.invokeMethod(msgExt, "getMsgId"));
String body = new String((byte[]) MethodUtils.invokeMethod(msgExt, "getBody"));
rocketInvocation.setMessageTopic(topic);
rocketInvocation.setMessageTags(tags);
rocketInvocation.setMessageId(msgId);
rocketInvocation.setMessageBody(body);
} catch (Exception exception) {
log.error("init listener rocketInvocation error", exception);
}
return rocketInvocation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.alibaba.jvm.sandbox.repeater.plugin.redis;

import com.alibaba.jvm.sandbox.api.event.BeforeEvent;
import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.api.DefaultInvocationProcessor;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType;

public class RocketmqListenerInvocationProcessor extends DefaultInvocationProcessor {
public RocketmqListenerInvocationProcessor(InvokeType type) {
super(type);
}

@Override
public Object[] assembleRequest(BeforeEvent event) {
if (event.argumentArray != null && event.argumentArray.length > 0) {
return new Object[]{event.argumentArray[0]};
}
return event.argumentArray;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.alibaba.jvm.sandbox.repeater.plugin.redis;

import com.alibaba.jvm.sandbox.api.event.Event;
import com.alibaba.jvm.sandbox.api.listener.EventListener;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationListener;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationProcessor;
import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.AbstractInvokePluginAdapter;
import com.alibaba.jvm.sandbox.repeater.plugin.core.model.EnhanceModel;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType;
import com.google.common.collect.Lists;

import java.util.List;

public class RocketmqListenerPlugin extends AbstractInvokePluginAdapter {
@Override
public InvokeType getType() {
return InvokeType.ROCKETMQ;
}

@Override
public String identity() {
return "rocketmq-consumer";
}

@Override
public boolean isEntrance() {
return true;
}

@Override
protected List<EnhanceModel> getEnhanceModels() {
EnhanceModel messageListenerConcurrently = EnhanceModel.builder().classPattern("org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently")
.methodPatterns(EnhanceModel.MethodPattern.transform("consumeMessage"))
.includeSubClasses(true)
.watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS)
.build();
EnhanceModel messageListenerOrderly = EnhanceModel.builder().classPattern("org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly")
.includeSubClasses(true)
.methodPatterns(EnhanceModel.MethodPattern.transform("consumeMessage"))
.watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS)
.build();
return Lists.newArrayList(messageListenerConcurrently, messageListenerOrderly);
}

@Override
protected InvocationProcessor getInvocationProcessor() {
return new RocketmqListenerInvocationProcessor(getType());
}

@Override
protected EventListener getEventListener(InvocationListener listener) {
return new RocketmqListenerEventListener(getType(), isEntrance(), listener, getInvocationProcessor());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.alibaba.jvm.sandbox.repeater.plugin.redis;

import com.alibaba.jvm.sandbox.api.event.BeforeEvent;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationListener;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationProcessor;
import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.api.DefaultEventListener;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.Invocation;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.RocketInvocation;
import org.apache.commons.lang3.reflect.MethodUtils;

public class RocketmqProducerEventListener extends DefaultEventListener {

public RocketmqProducerEventListener(InvokeType invokeType, boolean entrance, InvocationListener listener, InvocationProcessor processor) {
super(invokeType, entrance, listener, processor);
}

@Override
protected Invocation initInvocation(BeforeEvent beforeEvent) {
RocketInvocation rocketInvocation = new RocketInvocation();
Object mqArg = beforeEvent.argumentArray[0];
try {
String topic = String.valueOf(MethodUtils.invokeMethod(mqArg, "getTopic"));
String tags = String.valueOf(MethodUtils.invokeMethod(mqArg, "getTags"));
String body = new String((byte[]) MethodUtils.invokeMethod(mqArg, "getBody"));
rocketInvocation.setMessageTopic(topic);
rocketInvocation.setMessageTags(tags);
rocketInvocation.setMessageBody(body);
} catch (Exception exception) {
log.error("init producer rocketInvocation error", exception);
}
return rocketInvocation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.alibaba.jvm.sandbox.repeater.plugin.redis;

import com.alibaba.jvm.sandbox.api.event.BeforeEvent;
import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.api.DefaultInvocationProcessor;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType;

public class RocketmqProducerInvocationProcessor extends DefaultInvocationProcessor {
public RocketmqProducerInvocationProcessor(InvokeType type) {
super(type);
}

@Override
public Object[] assembleRequest(BeforeEvent event) {
if (event.argumentArray != null && event.argumentArray.length > 0) {
return new Object[]{event.argumentArray[0]};
}
return event.argumentArray;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.alibaba.jvm.sandbox.repeater.plugin.redis;

import com.alibaba.jvm.sandbox.api.event.Event;
import com.alibaba.jvm.sandbox.api.listener.EventListener;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationListener;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationProcessor;
import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.AbstractInvokePluginAdapter;
import com.alibaba.jvm.sandbox.repeater.plugin.core.model.EnhanceModel;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType;
import com.google.common.collect.Lists;

import java.util.List;

public class RocketmqProducerPlugin extends AbstractInvokePluginAdapter {
@Override
public InvokeType getType() {
return InvokeType.ROCKETMQ;
}

@Override
public String identity() {
return "rocketmq-producer";
}

@Override
public boolean isEntrance() {
return false;
}

@Override
protected List<EnhanceModel> getEnhanceModels() {
EnhanceModel producer = EnhanceModel.builder().classPattern("org.apache.rocketmq.client.producer.DefaultMQProducer")
.methodPatterns(EnhanceModel.MethodPattern.transform("send"))
.includeSubClasses(true)
.watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS)
.build();

return Lists.newArrayList(producer);
}

@Override
protected InvocationProcessor getInvocationProcessor() {
return new RocketmqProducerInvocationProcessor(getType());
}

@Override
protected EventListener getEventListener(InvocationListener listener) {
return new RocketmqProducerEventListener(getType(), isEntrance(), listener, getInvocationProcessor());
}
}