Skip to content

Commit

Permalink
合并版本
Browse files Browse the repository at this point in the history
  • Loading branch information
chengyouling committed Jan 3, 2025
1 parent 1734f10 commit 97db7db
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 528 deletions.
457 changes: 26 additions & 431 deletions .github/workflows/message_gray_integration_test.yml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ public String clearMessageCount() {
}

/**
* init push consumer
* init consumer
*
* @param consumerType consumerType
* @return init status
*/
@GetMapping("/initConsumer")
public String initConsumer(@RequestParam("consumerType") String consumerType) {
if ("PUSH".equals(consumerType)) {
pushConsumer.initPushConsume(topic + "-PUSH", nameServer);
pushConsumer.initPushConsumer(topic + "-PUSH", nameServer);
} else if ("PULL".equals(consumerType)) {
pullConsumer.initPullConsumer(nameServer, topic + "-PULL");
} else {
Expand All @@ -76,6 +76,24 @@ public String initConsumer(@RequestParam("consumerType") String consumerType) {
return "success";
}

/**
* shutdown consumer
*
* @param consumerType consumerType
* @return status
*/
@GetMapping("/shutdownConsumer")
public String shutdownConsumer(@RequestParam("consumerType") String consumerType) {
if ("PUSH".equals(consumerType)) {
pushConsumer.shutdownPushConsumer();
} else if ("PULL".equals(consumerType)) {
pullConsumer.shutdownPullConsumer();
} else {
litePullConsumer.shutdownLitePullConsumer();
}
return "success";
}

/**
* pull message
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ public Map<String, Object> getMessageCount(String mqTopic, String mqAddress) {
return RocketMqMessageUtils.getMessageCount();
}

/**
* shutdown consumer
*/
public void shutdownLitePullConsumer() {
executorService.shutdown();
if (litePullConsumer != null) {
litePullConsumer.shutdown();
}
}

/**
* lite pull consumer runnable
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ private static long getMessageQueueOffSet(MessageQueue mq) {
return OFF_SET_TABLE.getOrDefault(mq, 0L);
}

/**
* shutdown consumer
*/
public void shutdownPullConsumer() {
executorService.shutdown();
if (pullConsumer != null) {
pullConsumer.shutdown();
}
}

/**
* pull consumer runnable
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class RocketMqPushConsumer {
* @param mqTopic topic
* @param mqAddress address
*/
public void initPushConsume(String mqTopic, String mqAddress) {
public void initPushConsumer(String mqTopic, String mqAddress) {
try {
if (pushConsumer == null) {
pushConsumer = new DefaultMQPushConsumer("default");
Expand Down Expand Up @@ -76,8 +76,17 @@ public void initPushConsume(String mqTopic, String mqAddress) {
*/
public Map<String, Object> getMessageCount(String mqTopic, String mqAddress) {
if (pushConsumer == null) {
initPushConsume(mqTopic, mqAddress);
initPushConsumer(mqTopic, mqAddress);
}
return RocketMqMessageUtils.getMessageCount();
}

/**
* shutdown consumer
*/
public void shutdownPushConsumer() {
if (pushConsumer != null) {
pushConsumer.shutdown();
}
}
}
Loading

0 comments on commit 97db7db

Please sign in to comment.