【RabbitMQ-1】SpringBoot2.x的集成
发布日期:2022-02-14 16:09:33
浏览次数:23
分类:技术文章
本文共 7339 字,大约阅读时间需要 24 分钟。
1. 环境配置
环境:JDK8SpringBoot 2.1.3.RELEASE
依赖:
org.springframework.boot spring-boot-starter-amqp
2. 提供者配置
配置文件:
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest publisher-confirms: true # 确保消息不丢失 publisher-returns: true # 确保消息不丢失
注册组件:
因为RabbitMQ中Exchange有三种模式:订阅、路由、通配符。
- direct(路由模式):根据routingKey值选择对应的Binding。
- fanout(订阅模式):每个和交换机绑定的队列都会收到消息,相当于广播。
- topic(通配符模式):支持routingKey的模糊匹配选择Binding。
用户可以在项目启动时,向MQ注册一些Exchange和Queue。用户可以自由组合,**注册Binding关系。**它们使用routingKey进行区别。
生产者发送消息需要指定Exchange
和routingKey
。请求到达Exchange后,根据Exchange的模式和routingKey
找到一个或一组Binding关系。并将消息发送Binding对应到Queue中。
消费者监听对应的Queue,来处理消息。
- direct也称为路由模式,消息到达Exchange后,会根据指定的
routingKey
路由到指定routingKey
的Binding上。 所以需要在注册Binding时,指定各个Binding的路由键。
@Configurationpublic class DirectConfig { @Bean("directMessage1") public Queue directQueue1() { //name才是queue的名字,消费者实际监听的是dirQueue-1队列 return new Queue("dirQueue-1"); } @Bean("directMessage2") public Queue directQueue2() { return new Queue("dirQueue-2"); } //注册交换机 @Bean public DirectExchange directExchange() { return new DirectExchange("directExchange"); } @Bean public Binding bindingDirectExchange1(@Qualifier("directMessage1") Queue queue, DirectExchange directExchange) { String routingKey = "directExchange.message-1"; return BindingBuilder.bind(queue).to(directExchange).with(routingKey); } @Bean public Binding bindingDirectExchange2(@Qualifier("directMessage2") Queue queue, DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with("directExchange.message-2"); }}
- fanout模式不需要指定
routingKey
关系,消息发送到Exchange后,将分发到与交换机绑定的各个Queue中。也称为订阅-发布模式。
@Configurationpublic class FanooutConfig { @Bean(name = "AMessage") public Queue fanAMessage() { return new Queue("fanout.A"); } @Bean(name = "BMessage") public Queue fanBMessage() { return new Queue("fanout.B"); } @Bean(name = "CMessage") public Queue fanCMessage() { return new Queue("fanout.C"); } //广播模式 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(@Qualifier("AMessage") Queue message, FanoutExchange fanoutExchange) { return BindingBuilder.bind(message).to(fanoutExchange); } @Bean Binding bindingExchangeB(@Qualifier("BMessage") Queue message, FanoutExchange fanoutExchange) { return BindingBuilder.bind(message).to(fanoutExchange); } @Bean Binding bindingExchangeC(@Qualifier("CMessage") Queue message, FanoutExchange fanoutExchange) { return BindingBuilder.bind(message).to(fanoutExchange); }}
- topic通配符模式,注册的Binding的
routingKey
可以使用#或者*来进行模糊匹配。
消息到达Exchange后,使用指定的routingKey
模糊匹配到注册的routingKey
。
- * 代表的是一个单词。
- # 代表的是一个或多个单词。
@Configurationpublic class TopicConfig { @Bean("message") public Queue queueMessage() { return new Queue("topic.message"); } @Bean("messages") public Queue queueMessages() { return new Queue("topic.messages"); } @Bean public TopicExchange exchange(){ return new TopicExchange("topicExchange"); } //普通绑定 @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue message, TopicExchange exchange){ return BindingBuilder.bind(message).to(exchange).with("topic.message"); } //通配符绑定 @Bean("topicBindingExchangeMessages") Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessage,TopicExchange exchange){ return BindingBuilder.bind(queueMessage).to(exchange).with("topic.#"); }}
生产者发送消息:
@RestControllerpublic class RabbitMQController { @Autowired private RabbitTemplate rabbitTemplate; //直接向队列中发送数据 @GetMapping("send") public String send() { String content = "Date:" + System.currentTimeMillis(); rabbitTemplate.convertAndSend("kinson", content); return content; } @GetMapping("sendDirect") public Book sendDirect() { Book book = new Book(); book.setId("001"); book.setName("JAVA编思想"); book.setPrice(100); book.setInfo("学习JAVA必备"); String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend("directExchange", "directExchange.message", book, correlationData); return book; } @GetMapping("sendFanout") public Book sendFanout() { Book book = new Book(); book.setId("005"); book.setName("深入理解JVM虚拟机"); String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend("fanoutExchange", "" , book, correlationData); return book; } @GetMapping("sendTopic") public Book sendTopic() { Book book = new Book(); book.setId("003"); book.setName("mysql高性能优化"); String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend("topicExchange", "topic.message" , book, correlationData); return book; } /** * * 可以代替一个单词。 * # 可以替代零个或多个单词。 */ @GetMapping("sendTopic2") public Book sendTopic2() { Book book = new Book(); book.setId("004"); book.setName("高并发实战"); String id = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(id); rabbitTemplate.convertAndSend("topicExchange", "topic.xxx" , book, correlationData); return book; }}
3. 消费者配置
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest listener: simple:# acknowledge-mode: manual # 手动确定(默认自动确认) concurrency: 5 # 消费端的监听个数 max-concurrency: 10 # 消费端的监听最大个数 connection-timeout: 15000 # 超时时间
@Component@Slf4jpublic class MyReceiver1 { @RabbitListener(queues = { "kinson"}) public void receiver(Message msg, Channel channel) { byte[] messageBytes = msg.getBody(); if (messageBytes != null && messageBytes.length > 0) { //打印数据 String message = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("开始消费:{}\n channel:{}", message, channel); } } //监听的Queue //没有找到监听的Queue启动时会出现的异常:(reply-code=404, reply-text=NOT_FOUND - no queue 'directMessage' in vhost '/', class-id=50, method-id=10) @RabbitListener(queues = "dirQueue-1") public void receiverDirect(Message msg, Channel channel) throws IOException, ClassNotFoundException { log.info("【DirectExchange 绑定的队列】"); byte[] messageBytes = msg.getBody(); Book book = (Book) deserializable(messageBytes); log.info("开始消费:[{}]", JSON.toJSONString(book)); } public static Object deserializable(byte[] bytes) throws IOException, ClassNotFoundException { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); return in.readObject(); }}
推荐阅读
转载地址:https://blog.csdn.net/qq_29595463/article/details/107856970 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
不错!
[***.144.177.141]2024年04月24日 05时36分59秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
第二技能
2019-04-27
算法的设计
2019-04-27
linux c : get curent tty info
2019-04-27
JAVA Freemarker(9)---常见语法大全
2019-04-27
Java MyBatis(1)--- Generator 详解
2019-04-27
Java MyBatis(2)--- generatorConfig.xml详解与运行
2019-04-27
VueJS(5)---初步练习(5题)
2019-04-27
mysql(3)-- 修改root密码命令小结
2019-04-27
JQuery(3)--冒泡效果
2019-04-27
异常(2)-- UnsatisfiedLinkError: dalvik.system.PathClassLoader[DexPathList[[zip file "/data/app/项目包名
2019-04-27
Android软键盘(1)---输入法界面管理(打开/关闭/状态获取)
2019-04-27
Android动态设置view的高度宽度
2019-04-27
css3 属性 text-overflow 实现截取多余文字内容 以省略号来代替多余内容
2019-04-27
vue 事件总线EventBus的概念、使用以及注意点
2019-04-27
JavaScript 用七种方式教你判断一个变量是否为数组类型
2019-04-27
细讲前端设置cookie, 储存用户登录信息
2019-04-27
Web前端安全策略之CSRF的攻击与防御
2019-04-27
斯坦福CS236-深度生成模型2019-全套课程资料分享
2019-04-27
历史最全DL相关书籍、课程、视频、论文、数据集、会议、框架和工具整理分享
2019-04-27