Spring Boot笔记-接收RabbitMQ队列中的消息
发布日期:2021-06-30 11:01:51 浏览次数:2 分类:技术文章

本文共 4901 字,大约阅读时间需要 16 分钟。

目录

 

 


 

基本概念

首先有个关键:此处实验接收的数据类型为Order,这里要求发送和接收要一模一样。

包括包名和类名都要一模一样:

如下,consumerDemo

下面是productorDemo

这里,包名和类都一模一样否则接收端监听会失败!

 

在消费(接收订阅)端要配置一些数据:

spring.rabbitmq.listener.simple.concurrency=5spring.rabbitmq.listener.simple.max-concurrency=10spring.rabbitmq.listener.simple.acknowledge-mode=manual

这里指的是目前并发为5个,最大并发数为10个,监听确认为手动,也就是接收了数据,要给RabbitMQ给一个反馈信息

如下

这里有2个注解,是简单使用RabbitMQ的关键!

@RabbitListener(bindings = @QueueBinding(            value = @Queue(value = "order-queue", durable = "true"),            exchange = @Exchange(name = "order-exchange", type = "topic"),            key = "order.#"))    @RabbitHandler    public void onOrderMessage(@Payload Order order,                               @Headers Map
headers, Channel channel //手工签收需要rabbitMQ的通道 ) throws Exception{ .... .... ....}

这里如果没有对应的交换机和队列,那么此处就会自动新建

 

代码与实例

发送端不停的发送消息!接收端如下:

发送端关键代码:

OderSender.java

package SpringBoot.demo.produce;import SpringBoot.demo.entity.Order;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class OderSender {    @Autowired    private RabbitTemplate rabbitTemplate;    public void send(Order order) throws Exception{        CorrelationData correlationData = new CorrelationData();        correlationData.setId(order.getMessageId());        rabbitTemplate.convertAndSend("order-exchange",     //exchange                "order.abcd",       //routingKey                order,                          //消息体                correlationData);               //correlationData消息唯一ID    }}

DemoApplicationTests.java

package SpringBoot.demo;import SpringBoot.demo.entity.Order;import SpringBoot.demo.produce.OderSender;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.UUID;@RunWith(SpringRunner.class)@SpringBootTestpublic class DemoApplicationTests {    @Autowired    private OderSender oderSender;    @Test    public void contextLoads() {    }    @Test    public void testSend1()throws Exception{        Order order = new Order();        order.setId("20180618000000000003");        order.setName("测试订单3");        order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());        oderSender.send(order);    }}

application.properties

server.port=8001spring.rabbitmq.addresses=192.168.164.141:5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000server.servlet.context-path=/spring.http.encoding.charset=UTF-8spring.jackson.data-format=yyyy-MM-dd HH:mm:ssspring.jackson.time-zone=GMT+8spring.jackson.default-property-inclusion=NON_NULL

 

接收端关键如下:

OrderReceiver.java

package SpringBoot.demo.consumer;import SpringBoot.demo.entity.Order;import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Headers;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import java.util.Map;@Componentpublic class OrderReceiver {    @RabbitListener(bindings = @QueueBinding(            value = @Queue(value = "order-queue", durable = "true"),            exchange = @Exchange(name = "order-exchange", type = "topic"),            key = "order.#"))    @RabbitHandler    public void onOrderMessage(@Payload Order order,                               @Headers Map
headers, Channel channel //手工签收需要rabbitMQ的通道 ) throws Exception{ //消费者操作 System.out.println("--------------收到消息,开始消费--------------"); System.out.println("订单ID:" + order.getId()); //告诉RabbitMQ我已经签收了 long deliveryTag = (long)headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); //false为不支持批量签收 }}

application.properties

server.port=8002spring.rabbitmq.addresses=192.168.164.141:5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000server.servlet.context-path=/spring.http.encoding.charset=UTF-8spring.jackson.data-format=yyyy-MM-dd HH:mm:ssspring.jackson.time-zone=GMT+8spring.jackson.default-property-inclusion=NON_NULL#配置关于consumer相关的spring.rabbitmq.listener.simple.concurrency=5spring.rabbitmq.listener.simple.max-concurrency=10spring.rabbitmq.listener.simple.acknowledge-mode=manual#限流,同一时间只有一条消息消费spring.rabbitmq.listener.simple.prefetch=1

 

源码下载地址:

转载地址:https://it1995.blog.csdn.net/article/details/92769321 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:C++设计模式-外观模式
下一篇:Spring Boot笔记-发送消息给RabbitMQ

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月26日 12时48分48秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章