forked from HappySnailSunshine/JavaInterview
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MQ.md
2401 lines (1567 loc) · 57.2 KB
/
MQ.md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# MQ
## 消息队列
> 上面这篇什么是消息队列文章参考自3y的文章,下面的其他的都是自己学习笔记。
### 一、什么是消息队列?
自己网上找的
消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的。
消息队列,一般我们会简称它为MQ(Message Queue),嗯,就是很直白的简写。
我们先不管消息(Message)这个词,来看看队列(Queue)。这一看,队列大家应该都熟悉吧。
队列是一种先进先出的数据结构。
[![img](../media/pictures/MQ.assets/ccc415ee02dac374116ffd324e079299.jpg)](https://s5.51cto.com/oss/201904/15/ccc415ee02dac374116ffd324e079299.jpg)
在Java里边,已经实现了不少的队列了:
[<img src="../media/pictures/MQ.assets/3140455cd169462a76efc1cf484e9b0b.jpg" alt="img" style="zoom:50%;" />](https://s5.51cto.com/oss/201904/15/3140455cd169462a76efc1cf484e9b0b.jpg)
那为什么还需要消息队列(MQ)这种中间件呢???
- 到这里,大家可以先猜猜为什么要用消息队列(MQ)这种中间件,下面会继续补充。
消息队列可以简单理解为:把要传输的数据放在队列中。
[![img](../media/pictures/MQ.assets/8bcc9ce29559849fb30c4a8260759d36.png)](https://s2.51cto.com/oss/201904/15/8bcc9ce29559849fb30c4a8260759d36.png)
图片来源:https://www.cloudamqp.com/blog/2014-12-03-what-is-message-queuing.html
科普:
- 把数据放到消息队列叫做生产者
- 从消息队列里边取数据叫做消费者
### **二、为什么要用消息队列?**
为什么要用消息队列,也就是在问:用了消息队列有什么好处。我们看看以下的场景
**2.1 解耦**
现在我有一个系统A,系统A可以产生一个userId
[![img](../media/pictures/MQ.assets/6684b608abe04b214b92ee17f2889c12.jpg)](https://s4.51cto.com/oss/201904/15/6684b608abe04b214b92ee17f2889c12.jpg)
然后,现在有系统B和系统C都需要这个userId去做相关的操作
[![img](../media/pictures/MQ.assets/f66c17df010a66b1485461efe0138b77.jpg)](https://s3.51cto.com/oss/201904/15/f66c17df010a66b1485461efe0138b77.jpg)
写成伪代码可能是这样的:
```
public class SystemA {
// 系统B和系统C的依赖
SystemB systemB = new SystemB();
SystemC systemC = new SystemC();
// 系统A独有的数据userId
private String userId = "Java3y";
public void doSomething() { // 系统B和系统C都需要拿着系统A的userId去操作其他的事 systemB.SystemBNeed2do(userId);
systemC.SystemCNeed2do(userId);
}
}
```
结构图如下:
[![img](../media/pictures/MQ.assets/357f83f657673a37d69efad5f726527b.jpg)](https://s4.51cto.com/oss/201904/15/357f83f657673a37d69efad5f726527b.jpg)
ok,一切平安无事度过了几个天。
某一天,系统B的负责人告诉系统A的负责人,现在系统B的SystemBNeed2do(String userId)这个接口不再使用了,让系统A别去调它了。
于是,系统A的负责人说"好的,那我就不调用你了。",于是就把调用系统B接口的代码给删掉了:
```
public void doSomething() {
// 系统A不再调用系统B的接口了
//systemB.SystemBNeed2do(userId);
systemC.SystemCNeed2do(userId);
}
```
又过了几天,系统D的负责人接了个需求,也需要用到系统A的userId,于是就跑去跟系统A的负责人说:"老哥,我要用到你的userId,你调一下我的接口吧"
于是系统A说:"没问题的,这就搞"
[![img](../media/pictures/MQ.assets/ebe507302ee87c6b2d29e32b42d79ee9.jpg)](https://s1.51cto.com/oss/201904/15/ebe507302ee87c6b2d29e32b42d79ee9.jpg)
然后,系统A的代码如下:
```java
public class SystemA {
// 已经不再需要系统B的依赖了
// SystemB systemB = new SystemB();
// 系统C和系统D的依赖
SystemC systemC = new SystemC();
SystemD systemD = new SystemD();
// 系统A独有的数据
private String userId = "Java3y";
public void doSomething() {
// 已经不再需要系统B的依赖了
//systemB.SystemBNeed2do(userId);
// 系统C和系统D都需要拿着系统A的userId去操作其他的事 systemC.SystemCNeed2do(userId);
systemD.SystemDNeed2do(userId);
}
}
```
时间飞逝:
- 又过了几天,系统E的负责人过来了,告诉系统A,需要userId。
- 又过了几天,系统B的负责人过来了,告诉系统A,还是重新掉那个接口吧。
- 又过了几天,系统F的负责人过来了,告诉系统A,需要userId。
- ……
于是系统A的负责人,每天都被这给骚扰着,改来改去,改来改去…….
还有另外一个问题,调用系统C的时候,如果系统C挂了,系统A还得想办法处理。如果调用系统D时,由于网络延迟,请求超时了,那系统A是反馈fail还是重试??
***,系统A的负责人,觉得隔一段时间就改来改去,没意思,于是就跑路了。
然后,公司招来一个大佬,大佬经过几天熟悉,上来就说:将系统A的userId写到消息队列中,这样系统A就不用经常改动了。为什么呢?下面我们来一起看看:
[![img](../media/pictures/MQ.assets/1657e6a177e2aa8f6223ea9544523ac5.jpg)](https://s4.51cto.com/oss/201904/15/1657e6a177e2aa8f6223ea9544523ac5.jpg)
系统A将userId写到消息队列中,系统C和系统D从消息队列中拿数据。这样有什么好处?
- 系统A只负责把数据写到队列中,谁想要或不想要这个数据(消息),系统A一点都不关心。
- 即便现在系统D不想要userId这个数据了,系统B又突然想要userId这个数据了,都跟系统A无关,系统A一点代码都不用改。
- 系统D拿userId不再经过系统A,而是从消息队列里边拿。系统D即便挂了或者请求超时,都跟系统A无关,只跟消息队列有关。
这样一来,系统A与系统B、C、D都解耦了。
**2.2 异步**
我们再来看看下面这种情况:系统A还是直接调用系统B、C、D
[![img](../media/pictures/MQ.assets/d3580ccb52fa2d96b5492abec9425326.jpg)](https://s2.51cto.com/oss/201904/15/d3580ccb52fa2d96b5492abec9425326.jpg)
代码如下:
```java
public class SystemA {
SystemB systemB = new SystemB();
SystemC systemC = new SystemC();
SystemD systemD = new SystemD();
// 系统A独有的数据
private String userId ;
public void doOrder() {
// 下订单
userId = this.order();
// 如果下单成功,则安排其他系统做一些事
systemB.SystemBNeed2do(userId);
systemC.SystemCNeed2do(userId);
systemD.SystemDNeed2do(userId);
}
}
```
假设系统A运算出userId具体的值需要50ms,调用系统B的接口需要300ms,调用系统C的接口需要300ms,调用系统D的接口需要300ms。那么这次请求就需要50+300+300+300=950ms
并且我们得知,系统A做的是主要的业务,而系统B、C、D是非主要的业务。比如系统A处理的是订单下单,而系统B是订单下单成功了,那发送一条短信告诉具体的用户此订单已成功,而系统C和系统D也是处理一些小事而已。
那么此时,为了提高用户体验和吞吐量,其实可以异步地调用系统B、C、D的接口。所以,我们可以弄成是这样的:
[![img](../media/pictures/MQ.assets/30d3012247c59f9a98e6b6277a921f9e.jpg)](https://s2.51cto.com/oss/201904/15/30d3012247c59f9a98e6b6277a921f9e.jpg)
系统A执行完了以后,将userId写到消息队列中,然后就直接返回了(至于其他的操作,则异步处理)。
- 本来整个请求需要用950ms(同步)
- 现在将调用其他系统接口异步化,从请求到返回只需要100ms(异步)
(例子可能举得不太好,但我觉得说明到点子上就行了,见谅。)
**2.3削峰/限流**
我们再来一个场景,现在我们每个月要搞一次大促,大促期间的并发可能会很高的,比如每秒3000个请求。假设我们现在有两台机器处理请求,并且每台机器只能每次处理1000个请求。
[![img](../media/pictures/MQ.assets/0c376fa4516d99066821976ec70f34e7.jpg)](https://s3.51cto.com/oss/201904/15/0c376fa4516d99066821976ec70f34e7.jpg)
那多出来的1000个请求,可能就把我们整个系统给搞崩了…所以,有一种办法,我们可以写到消息队列中:
[![img](../media/pictures/MQ.assets/6b9040fb486350734e162c9cf52e1c5a.jpg)](https://s2.51cto.com/oss/201904/15/6b9040fb486350734e162c9cf52e1c5a.jpg)
系统B和系统C根据自己的能够处理的请求数去消息队列中拿数据,这样即便有每秒有8000个请求,那只是把请求放在消息队列中,去拿消息队列的消息由系统自己去控制,这样就不会把整个系统给搞崩。
### **三、使用消息队列有什么问题?**
经过我们上面的场景,我们已经可以发现,消息队列能做的事其实还是蛮多的。
说到这里,我们先回到文章的开头,"明明 JDK已经有不少的队列实现了,我们还需要消息队列中间件呢?"其实很简单,JDK实现的队列种类虽然有很多种,但是都是简单的内存队列。为什么我说JDK是简单的内存队列呢?下面我们来看看要实现消息队列(中间件)可能要考虑什么问题。
**3.1高可用**
无论是我们使用消息队列来做解耦、异步还是削峰,消息队列肯定不能是单机的。试着想一下,如果是单机的消息队列,万一这台机器挂了,那我们整个系统几乎就是不可用了。
[![img](../media/pictures/MQ.assets/7be3dad31d267e3dd98963e9b9d676fb.jpg)](https://s1.51cto.com/oss/201904/15/7be3dad31d267e3dd98963e9b9d676fb.jpg)
所以,当我们项目中使用消息队列,都是得集群/分布式的。要做集群/分布式就必然希望该消息队列能够提供现成的支持,而不是自己写代码手动去实现。
**3.2 数据丢失问题**
我们将数据写到消息队列上,系统B和C还没来得及取消息队列的数据,就挂掉了。如果没有做任何的措施,我们的数据就丢了。
[![img](../media/pictures/MQ.assets/83ac129c760ecfc4c09728e99e1f7e32.jpg)](https://s3.51cto.com/oss/201904/15/83ac129c760ecfc4c09728e99e1f7e32.jpg)
学过Redis的都知道,Redis可以将数据持久化磁盘上,万一Redis挂了,还能从磁盘从将数据恢复过来。同样地,消息队列中的数据也需要存在别的地方,这样才尽可能减少数据的丢失。
那存在哪呢?
- 磁盘?
- 数据库?
- Redis?
- 分布式文件系统?
同步存储还是异步存储?
**3.3消费者怎么得到消息队列的数据?**
消费者怎么从消息队列里边得到数据?有两种办法:
- 生产者将数据放到消息队列中,消息队列有数据了,主动叫消费者去拿(俗称push)
- 消费者不断去轮训消息队列,看看有没有新的数据,如果有就消费(俗称pull)
**3.4其他**
除了这些,我们在使用的时候还得考虑各种的问题:
- 消息重复消费了怎么办啊?
- 我想保证消息是绝对有顺序的怎么做?
- ……..
虽然消息队列给我们带来了那么多的好处,但同时我们发现引入消息队列也会提高系统的复杂性。市面上现在已经有不少消息队列轮子了,每种消息队列都有自己的特点,选取哪种MQ还得好好斟酌。
***\****
本文主要讲解了什么是消息队列,消息队列可以为我们带来什么好处,以及一个消息队列可能会涉及到哪些问题。希望给大家带来一定的帮助。
## ActiveMQ
## RocketMQ
### 面试题
RocketMQ必问,把上课讲的准备点就可以应付了,其实这块也可介绍一些其他的消息中间件
rocketmq怎么配置的?
### **消息队列都有哪些**
**答:**Kafka、ActiveMQ、RabbitMQ、RocketMQ(在用)
秒杀怎么做的 mq订阅方事务失败了怎么办
1.10RocketMQ用的哪种?全局排序还是分区排序?(真的不知道),如何分区的?
rocketMQ事务
订阅消息模式
RocketMQ消息队列的流程
为何用RocketMQ?和其他MQ相比,它的优点是什么?
分布式事务是通过MQ实现的?说一下RocketMQ事务流程**
### 是否了解rabbitMQ,是否了解MongoDB
6.用的是消息队列是什么?哪里用到了异步消息队列?
\10. 消息队列的原理?
12.rocketmq有哪些优势
14.rocketmq如何实现分布式事务?
参考:https://www.jianshu.com/p/824066d70da8
## Kafka
## RabbitMQ
学习这个MQ的时候,有个小细节,有时候需要先启动Recv1,Recv2,然后再用Send发送消息。
个人感觉这里其实是Recv在监听Listener生产者,如果生产者生产出来消息,消费者就会根据某种约定的规则来消费。
### 1.消息队列解决了什么问题
#### 异步处理
![1594020269426](../media/pictures/MQ.assets/1594020269426.png)
如果串行的话,显然很浪费时间,因为发短信和邮件这些并不是和主业务有关。
如果并行,就像交易所项目,将这些非业务相关放到线程池里面。
异步就是放到MQ里面啦。
#### 应用解耦
![1594020460950](../media/pictures/MQ.assets/1594020460950.png)
微服务秒杀系统,订单完成以后,要减库存,假如这时候减库存这个服务挂了怎么办?
这时候引入MQ,订单完成以后,放入MQ,即使是减库存服务挂掉,等它上线,去MQ拿取还是可以减库存的,这时候订单模块和减库存模块是完全解耦的。
#### 流量削峰
![1594021283046](../media/pictures/MQ.assets/1594021283046.png)
当时学秒杀的时候,用过这个,异步削峰。 req指的是请求。
#### 日志处理
### 2.Rabbit安装与配置
可以正常的安装下载linux或者wins版本。这里试试docker版本。
#### Docker下安装RabbitMQ
我们还是采用老规矩,在docker上安装最新版本
##### 1.拉取镜像
```
docker pull rabbitmq:3-management
```
![img](../media/pictures/MQ.assets/869262-20190617144708629-1279840356.png)
##### 2.查看镜像
```
docker images;
```
![img](../media/pictures/MQ.assets/869262-20190617144817128-740364497.png)
##### 3.启动镜像 生成docker容器 ,**默认用户名是guest ,密码是guest**
```shell
docker run --name rabbitmq -p 15672:15672 -p 5672:5672 1482b87815ec
```
![img](../media/pictures/MQ.assets/869262-20190617145436225-1650718155.png)
##### 4.如果在生成容器的时候设置用户名和密码的命令如下
```
docker run --name rabbitmq -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 rabbitmq:3-management
```
##### 5.查看所有的docker容器
```
[root@holly ~]# docker ps -a
```
![img](../media/pictures/MQ.assets/869262-20190617150022232-206618987.png)
##### 6.启动rabbitmq容器
```
[root@holly ~]# docker start rabbitmq
```
![img](../media/pictures/MQ.assets/869262-20190617153408239-2136144192.png)
##### 7.查看已经启动的容器
![img](../media/pictures/MQ.assets/869262-20190617153458316-820400943.png)
##### 8.浏览器访问
![img](../media/pictures/MQ.assets/869262-20190617153542795-1948356413.png)
![img](../media/pictures/MQ.assets/869262-20190617153641500-765220633.png)
参考:https://www.cnblogs.com/holly8/p/11040009.html
##### ERROR:
在阿里云Docker安装RabbitMQ以后,阿里云安全组也都打开,但是外面访问不了主页。这时候需要:
```shell
#开启插件:首先使用命令进入容器
docker exec -it myrabbitmq bash
#开启插件命令:
rabbitmq-plugins enable rabbitmq_management
```
![1594026000747](../media/pictures/MQ.assets/1594026000747.png)
然后就可以访问了:
![1594026017460](../media/pictures/MQ.assets/1594026017460.png)
参考:https://www.cnblogs.com/hellohero55/p/11953882.html
#### 用户管理
##### 新建一个用户 用于开发
![1594026721078](../media/pictures/MQ.assets/1594026721078.png)
##### virtual hosts 管理
就相当于mysql 或者其他数据库 db
![1594026967549](../media/pictures/MQ.assets/1594026967549.png)
![1594027064737](../media/pictures/MQ.assets/1594027064737.png)
一般以/开头
![1594027621965](../media/pictures/MQ.assets/1594027621965.png)
给刚才添加的user赋予新的权限。
然后试试user_steve 123 是否可以登录、
![1594027760001](../media/pictures/MQ.assets/1594027760001.png)
登录成功。
看一些其他的信息。
![1594027971304](../media/pictures/MQ.assets/1594027971304.png)
其实用docker部署的时候是可以看到这些的。
### 3.Java操作rabbitmq
#### 1.simple 简单队列
##### 模型
![1594037378049](../media/pictures/MQ.assets/1594037378049.png)
P:消息的生产者
红色的队列
C:消息的消费者
##### 获取MQ连接
```java
package com.jdrx.rabbitmq.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 获取MQ的连接
*
* @author Steve
* @date 2020/7/6-21:30
*/
public class ConnectionUtils {
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("47.92.208.93");
//AMQP 5672
factory.setPort(5672);
//vhost
factory.setVirtualHost("/vhost_steve");
//用户名
factory.setUsername("user_steve");
//密码
factory.setPassword("123");
return factory.newConnection();
}
}
```
这里有一个注意的点,如果在浏览器访问阿里云上面的RabbitMQ,则15672端口是可以访问的。
如果是,在代码中作为客户端使用,15672用不了,只能用5672。
猜测估计是,15672是server,5672是client。
服务5672启动以后,就可以
![1594082996051](../media/pictures/MQ.assets/1594082996051.png)
在服务器上,就可以看到。
![1594083019988](../media/pictures/MQ.assets/1594083019988.png)
##### 消息生产者
```java
package com.jdrx.rabbitmq.simple;
import com.jdrx.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Steve
* @date 2020/7/6-21:42
*/
public class Send {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "hello simple !";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("--send msg" + msg);
channel.close();
connection.close();
}
}
```
##### 消息消费者
```java
package com.jdrx.rabbitmq.simple;
import com.jdrx.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费消息
*
* @author Steve
* @date 2020/7/7-8:54
*/
public class Recv {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建信道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//获取到达的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("new Api msg" + msg);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
}
/**
* 旧的API 这里面的方法比较旧 有些已经废除啦
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
public static void oldAPI() throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建信道
Channel channel = connection.createChannel();
//定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msgString = new String(delivery.getBody());
System.out.println("[recv] msg:" + msgString);
}
}
}
```
##### 简单队列的不足
耦合性高,生产者一一对应消费者,如果想有多个消费者消费队列中的消息,这时候就不行啦
队列名变更,这时候得同时变更。
#### 2.work queues 工作多队列
##### 轮询分发
这个就是一个生产者对应多个消息消费者。
###### 模型
![1594087159028](../media/pictures/MQ.assets/1594087159028.png)
###### 为什么会出现工作队列?
Simple队列时一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,小份这接受到消息之后就需要处理,可能需要花费时间,这时候队列就会积压很多消息。
###### 生产者
```java
package com.jdrx.rabbitmq.work;
import com.jdrx.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* |----C1
* P-------|
* |----C2
*
* @author Steve
* @date 2020/7/7-10:05
*/
public class Send {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
String msg = "hello " + i;
System.out.println("[WQ] send:" +msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
Thread.sleep(i * 20);
}
channel.close();
connection.close();
}
}
```
###### 消费者1 停两秒
```java
package com.jdrx.rabbitmq.work;
import com.jdrx.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Steve
* @date 2020/7/7-10:13
*/
public class Recv1 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建信道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[1] Recv msg" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] Recv done");
}
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
```
###### 消费者2 停一秒
```java
package com.jdrx.rabbitmq.work;
import com.jdrx.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Steve
* @date 2020/7/7-10:13
*/
public class Recv2 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//创建信道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[2] Recv msg" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] Recv done");
}
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
```
###### 现象
消费者1和消费者2处理的数据时一样的,均分的。
```java
//消费者1
[1] Recv msghello 0
[1] Recv done
[1] Recv msghello 2
[1] Recv done
[1] Recv msghello 4
[1] Recv done
[1] Recv msghello 6
[1] Recv done
[1] Recv msghello 8
[1] Recv done
[1] Recv msghello 10
[1] Recv done
[1] Recv msghello 12
[1] Recv done
[1] Recv msghello 14
[1] Recv done
[1] Recv msghello 16
[1] Recv done
[1] Recv msghello 18
[1] Recv done
[1] Recv msghello 20
[1] Recv done
[1] Recv msghello 22
[1] Recv done
[1] Recv msghello 24
[1] Recv done
[1] Recv msghello 26
[1] Recv done
[1] Recv msghello 28
[1] Recv done
[1] Recv msghello 30
[1] Recv done
[1] Recv msghello 32
[1] Recv done
[1] Recv msghello 34
[1] Recv done
[1] Recv msghello 36
[1] Recv done
[1] Recv msghello 38
[1] Recv done
[1] Recv msghello 40
[1] Recv done
[1] Recv msghello 42
[1] Recv done
[1] Recv msghello 44
[1] Recv done
[1] Recv msghello 46
[1] Recv done
[1] Recv msghello 48
[1] Recv done
//消费者2
[2] Recv msghello 1
[2] Recv done
[2] Recv msghello 3
[2] Recv done
[2] Recv msghello 5
[2] Recv done
[2] Recv msghello 7
[2] Recv done
[2] Recv msghello 9
[2] Recv done
[2] Recv msghello 11
[2] Recv done
[2] Recv msghello 13
[2] Recv done
[2] Recv msghello 15
[2] Recv done
[2] Recv msghello 17
[2] Recv done
[2] Recv msghello 19
[2] Recv done
[2] Recv msghello 21
[2] Recv done
[2] Recv msghello 23
[2] Recv done
[2] Recv msghello 25
[2] Recv done
[2] Recv msghello 27
[2] Recv done
[2] Recv msghello 29
[2] Recv done
[2] Recv msghello 31
[2] Recv done
[2] Recv msghello 33
[2] Recv done
[2] Recv msghello 35
[2] Recv done
[2] Recv msghello 37
[2] Recv done
[2] Recv msghello 39
[2] Recv done
[2] Recv msghello 41
[2] Recv done
[2] Recv msghello 43
[2] Recv done
[2] Recv msghello 45
[2] Recv done
[2] Recv msghello 47