【RabbitMQ-2】RabbitMQ的并发参数(concurrency和prefetch)
发布日期:2022-02-14 16:09:33 浏览次数:29 分类:技术文章

本文共 2377 字,大约阅读时间需要 7 分钟。

文章目录

1. 环境配置

版本信息:JDK:8SpringBoot 2.1.3.RELEASE

RabbitMQ消费端配置

spring:  rabbitmq:    host: localhost    port: 5672    username: guest    password: guest    listener:      simple:#        acknowledge-mode: manual  # 手动确定(默认自动确认)        concurrency: 1 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)        max-concurrency: 10 # 消费端的监听最大个数        prefetch: 10    connection-timeout: 15000   # 超时时间

在消费端,配置prefetchconcurrency参数便可以实现消费端MQ并发处理消息,那么这两个参数到底有什么含义??

2. prefetch属性

每个customer会在MQ预取一些消息放入内存的LinkedBlockingQueue中进行消费,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果ack模式为none,则忽略。如有必要,将增加此值以匹配txSize或messagePerAck。从2.0开始默认为250;设置为1将还原为以前的行为。

prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。

不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。

对于低容量消息和多个消费者的情况(也包括单listener容器的concurrency配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动ack下并设置prefetch=1

模拟:

生产者每次生产10条消息:

spring:  rabbitmq:    host: localhost    port: 5672    username: guest    password: guest    publisher-confirms: true    publisher-returns: true

JAVA代码:

@RestControllerpublic class RabbitMQController {
@Autowired private RabbitTemplate rabbitTemplate; //直接向队列中发送数据 @GetMapping("send") public String send() {
for (int i = 0; i < 10; i++) {
String content = "Date:" + System.currentTimeMillis(); content = content + ":::" + i; rabbitTemplate.convertAndSend("kinson", content); } return "success"; }}

3. concurrency属性

上面配置中,concurrency =1,即每个Listener容器将开启一个线程去处理消息。

在2.0版本后,可以在注解中配置该参数:

@Component@Slf4jpublic class CustomerRev {
//会覆盖配置文件中的参数。 @RabbitListener(queues = {
"kinson"},concurrency = "2") public void receiver(Message msg, Channel channel) throws InterruptedException {
// Thread.sleep(10000); byte[] messageBytes = msg.getBody(); if (messageBytes != null && messageBytes.length > 0) {
//打印数据 String message = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("【消3】:{}", message); } }}

启动服务:

可以看到MQ有两个消费者.png

即该Listener容器产生了两个线程去消费queue。如果在Listener配置了exclusive参数,即确定此容器中的单个customer是否具有对队列的独占访问权限。如果为true,则容器的并发性必须为1。

image.png

4. prefetch和concurrency属性

若一个消费者配置prefetch=10,concurrency=2,即会开启2个线程去消费消息,每个线程都会抓取10个线程到内存中(注意不是两个线程去共享内存中抓取的消息)。

下图所示,当10个消息进入MQ后,两个线程都抓取了5个线程放入了自己的LinkedBlockingQueue进行消费。

image.png

推荐阅读

转载地址:https://blog.csdn.net/qq_29595463/article/details/107857172 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Java agent从0到1的踩坑过程
下一篇:【RabbitMQ-3】连接池的配置

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年04月10日 16时23分57秒