Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The message queue is not in assigned list #107

Open
andyyumiao opened this issue Dec 28, 2023 · 9 comments
Open

The message queue is not in assigned list #107

andyyumiao opened this issue Dec 28, 2023 · 9 comments

Comments

@andyyumiao
Copy link

andyyumiao commented Dec 28, 2023

I got the error when i consume rocketmq message in flink job:

17:51:50.537 [rmq-pull-thread-1] ERROR org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil - RuntimeException, retry 2/5
java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
   at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:348)
   at org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil.call(RetryUtil.java:56)
   at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:276)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
   at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:658)
   at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seek(DefaultLitePullConsumer.java:298)
   at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:282)
   ... 5 common frames omitted
17:51:51.342 [rmq-pull-thread-2] ERROR org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil - RuntimeException, retry 3/5
java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=2]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
   at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:348)
   at org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil.call(RetryUtil.java:56)
   at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:276)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, may be rebalancing, message queue: MessageQueue [topic=yumiao1205, brokerName=broker-a, queueId=2]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
   at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:658)
   at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seek(DefaultLitePullConsumer.java:298)
   at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:282)
   ... 5 common frames omitted

My rocketmq version: 5.1.1
flink version: 1.15.0

@cj495840252
Copy link

I also encounter this error, it's seem that still not solved

@humkum
Copy link
Contributor

humkum commented Mar 12, 2024

See #96

@cj495840252
Copy link

I am still have this error
while i run the simpleConsumer it's working normally, it's mean can connect rocketmq?
image

it's still failed while I run the ConnectorExample
RocketMQ: 5.0.0

2024-03-12 23:05:42  WARN [   Source: Custom Source (1/2)#0] [e.flink.runtime.taskmanager.Task] Source: Custom Source (1/2)#0 (ea15ccd58a1ef50baebef0860ee2e52b) switched from INITIALIZING to FAILED with failure cause: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=test, brokerName=broker-a, queueId=1]
For more information, please visit the url, https://rocketmq.apache.org/docs/bestPractice/06FAQ
	at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seek(DefaultLitePullConsumerImpl.java:660)
	at org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.seekToEnd(DefaultLitePullConsumerImpl.java:693)
	at org.apache.rocketmq.client.consumer.DefaultLitePullConsumer.seekToEnd(DefaultLitePullConsumer.java:394)
	at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.initOffsets(RocketMQSourceFunction.java:394)
	at org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.open(RocketMQSourceFunction.java:246)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.lang.Thread.run(Thread.java:750)

@cj495840252
Copy link

See #96
hi, good morning , this is my setting
image

@madi1819
Copy link

I have also encountered this problem. Has it been resolved
image

@madi1819
Copy link

See #96

this is my test code
image

@loserwang1024
Copy link

See #96
It seems do nothing!

@ping-cai
Copy link

ping-cai commented Jun 7, 2024

it seens do nothing! this brantch 'main-latest' still exist! who can help fix this problem
RocketMQ Version:4.9.2
Flink Version: 1.15.2
problem details:
Caused by: org.apache.rocketmq.client.exception.MQClientException: The message queue is not in assigned list, message queue: MessageQueue [topic=tp_ld_driver_behavior_score_change, brokerName=broker-onlinemq1-d, queueId=0]
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
I guess this issue comes from the class 'DefaultLitePullConsumer',Previously, it was DefaultMQPullConsumer,This class will be removed in 2022, and a better implementation {@link DefaultLitePullConsumer} is recommend to use

@liangjw
Copy link

liangjw commented Aug 18, 2024

Is this bug fixed? I have encountered this issue and found the messagequeue shoud be assigned. But I cant find any assined code.

The following code is runnable if assign the queue to consumer manually:

`
Collection totalQueues = consumer.fetchMessageQueues(topic);
consumer.assign(totalQueues);
//it works

consumer.seekToEnd(totalQueues.iterator().next());
`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants