【RabbitMQ-1】SpringBoot2.x的集成
发布日期:2022-02-14 16:09:33 浏览次数:23 分类:技术文章

本文共 7339 字,大约阅读时间需要 24 分钟。

1. 环境配置

环境:JDK8SpringBoot 2.1.3.RELEASE

依赖:

org.springframework.boot
spring-boot-starter-amqp

2. 提供者配置

配置文件:

spring:  rabbitmq:    host: localhost    port: 5672    username: guest    password: guest    publisher-confirms: true  # 确保消息不丢失    publisher-returns: true # 确保消息不丢失

注册组件:

因为RabbitMQ中Exchange有三种模式:订阅、路由、通配符。

  1. direct(路由模式):根据routingKey值选择对应的Binding。

direct模式.png

  1. fanout(订阅模式):每个和交换机绑定的队列都会收到消息,相当于广播。

fanout模式.png

  1. topic(通配符模式):支持routingKey的模糊匹配选择Binding。

topic模式.png

用户可以在项目启动时,向MQ注册一些Exchange和Queue。用户可以自由组合,**注册Binding关系。**它们使用routingKey进行区别。

生产者发送消息需要指定ExchangeroutingKey。请求到达Exchange后,根据Exchange的模式和routingKey找到一个或一组Binding关系。并将消息发送Binding对应到Queue中。

消费者监听对应的Queue,来处理消息。


  1. direct也称为路由模式,消息到达Exchange后,会根据指定的routingKey路由到指定routingKey的Binding上。
    所以需要在注册Binding时,指定各个Binding的路由键。
@Configurationpublic class DirectConfig {
@Bean("directMessage1") public Queue directQueue1() {
//name才是queue的名字,消费者实际监听的是dirQueue-1队列 return new Queue("dirQueue-1"); } @Bean("directMessage2") public Queue directQueue2() {
return new Queue("dirQueue-2"); } //注册交换机 @Bean public DirectExchange directExchange() {
return new DirectExchange("directExchange"); } @Bean public Binding bindingDirectExchange1(@Qualifier("directMessage1") Queue queue, DirectExchange directExchange) {
String routingKey = "directExchange.message-1"; return BindingBuilder.bind(queue).to(directExchange).with(routingKey); } @Bean public Binding bindingDirectExchange2(@Qualifier("directMessage2") Queue queue, DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("directExchange.message-2"); }}
  1. fanout模式不需要指定routingKey关系,消息发送到Exchange后,将分发到与交换机绑定的各个Queue中。也称为订阅-发布模式。
@Configurationpublic class FanooutConfig {
@Bean(name = "AMessage") public Queue fanAMessage() {
return new Queue("fanout.A"); } @Bean(name = "BMessage") public Queue fanBMessage() {
return new Queue("fanout.B"); } @Bean(name = "CMessage") public Queue fanCMessage() {
return new Queue("fanout.C"); } //广播模式 @Bean public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(@Qualifier("AMessage") Queue message, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(message).to(fanoutExchange); } @Bean Binding bindingExchangeB(@Qualifier("BMessage") Queue message, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(message).to(fanoutExchange); } @Bean Binding bindingExchangeC(@Qualifier("CMessage") Queue message, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(message).to(fanoutExchange); }}
  1. topic通配符模式,注册的Binding的routingKey可以使用#或者*来进行模糊匹配。

消息到达Exchange后,使用指定的routingKey模糊匹配到注册的routingKey

  • * 代表的是一个单词。
  • # 代表的是一个或多个单词。
@Configurationpublic class TopicConfig {
@Bean("message") public Queue queueMessage() {
return new Queue("topic.message"); } @Bean("messages") public Queue queueMessages() {
return new Queue("topic.messages"); } @Bean public TopicExchange exchange(){
return new TopicExchange("topicExchange"); } //普通绑定 @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue message, TopicExchange exchange){
return BindingBuilder.bind(message).to(exchange).with("topic.message"); } //通配符绑定 @Bean("topicBindingExchangeMessages") Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessage,TopicExchange exchange){
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.#"); }}

生产者发送消息:

@RestControllerpublic class RabbitMQController {
@Autowired private RabbitTemplate rabbitTemplate; //直接向队列中发送数据 @GetMapping("send") public String send() {
String content = "Date:" + System.currentTimeMillis(); rabbitTemplate.convertAndSend("kinson", content); return content; } @GetMapping("sendDirect") public Book sendDirect() {
Book book = new Book(); book.setId("001"); book.setName("JAVA编思想"); book.setPrice(100); book.setInfo("学习JAVA必备"); String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend("directExchange", "directExchange.message", book, correlationData); return book; } @GetMapping("sendFanout") public Book sendFanout() {
Book book = new Book(); book.setId("005"); book.setName("深入理解JVM虚拟机"); String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend("fanoutExchange", "" , book, correlationData); return book; } @GetMapping("sendTopic") public Book sendTopic() {
Book book = new Book(); book.setId("003"); book.setName("mysql高性能优化"); String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend("topicExchange", "topic.message" , book, correlationData); return book; } /** * * 可以代替一个单词。 * # 可以替代零个或多个单词。 */ @GetMapping("sendTopic2") public Book sendTopic2() {
Book book = new Book(); book.setId("004"); book.setName("高并发实战"); String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend("topicExchange", "topic.xxx" , book, correlationData); return book; }}

3. 消费者配置

spring:  rabbitmq:    host: localhost    port: 5672    username: guest    password: guest    listener:      simple:#        acknowledge-mode: manual  # 手动确定(默认自动确认)        concurrency: 5 # 消费端的监听个数        max-concurrency: 10 # 消费端的监听最大个数    connection-timeout: 15000   # 超时时间
@Component@Slf4jpublic class MyReceiver1 {
@RabbitListener(queues = {
"kinson"}) public void receiver(Message msg, Channel channel) {
byte[] messageBytes = msg.getBody(); if (messageBytes != null && messageBytes.length > 0) {
//打印数据 String message = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("开始消费:{}\n channel:{}", message, channel); } } //监听的Queue //没有找到监听的Queue启动时会出现的异常:(reply-code=404, reply-text=NOT_FOUND - no queue 'directMessage' in vhost '/', class-id=50, method-id=10) @RabbitListener(queues = "dirQueue-1") public void receiverDirect(Message msg, Channel channel) throws IOException, ClassNotFoundException {
log.info("【DirectExchange 绑定的队列】"); byte[] messageBytes = msg.getBody(); Book book = (Book) deserializable(messageBytes); log.info("开始消费:[{}]", JSON.toJSONString(book)); } public static Object deserializable(byte[] bytes) throws IOException, ClassNotFoundException {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); return in.readObject(); }}

推荐阅读

转载地址:https://blog.csdn.net/qq_29595463/article/details/107856970 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:【RabbitMQ-3】连接池的配置
下一篇:【RabbitMQ-4】拉模式和推模式

发表评论

最新留言

不错!
[***.144.177.141]2024年04月24日 05时36分59秒