SpringBoot集成RabbitMQ实现发布确认
发布日期:2022-02-10 08:11:08
浏览次数:22
分类:技术文章
本文共 5457 字,大约阅读时间需要 18 分钟。
前言
何为发布确认?
字面意思:P发送消息之后,能够确定C收到消息了
问题
P发送消息要通过交换机和队列,所以只需要确保这二者都不出问题,或者有PlanB来防止出意外即可
问题一:交换机出问题
不明原因,导致交换机重启,在重启过程中,生产者投递消息失败,导致消息丢失;
交换机宕机了,没有集群
。。。
问题二:队列出问题
交换机和队列之间的bingKey错误,导致找不到指定队列;
队列TTL已到
。。。
解决方案
交换机问题解决
交换机是否收到消息,生产者是不知道的,所以需要写一个回调方法,通知生产者,消息是否被交换机接收到
队列问题解决
交换机是否找到对应队列,交换机如果找不到对应队列,会将消息丢掉,这种情况是不允许发生的,所以需要把不可达的消息返回交换机,交给其他(备份)交换机重新发送
流程图
实现步骤
声明配置
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest publisher-confirm-type: correlated #开启发布确认 默认是关闭的 publisher-returns: true #开启消息回退 交换机找不到队列时,会直接丢弃消息,回退给生产者
声明 确定队列,备份队列,警告队列,业务交换机,备份交换机,以及它们之间的绑定关系
@Configurationpublic class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; public static final String BACKUP_QUEUE_NAME = "backup.queue"; public static final String WARNING_QUEUE_NAME = "warning.queue"; //声明确定队列 @Bean("confirQueue") public Queue queue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //声明业务交换机 @Bean("confirExchange") public DirectExchange directExchange(){ return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build(); } //绑定确定队列和业务交换机 @Bean public Binding bindingQueueToExchange(@Qualifier("confirQueue") Queue confirQueue, @Qualifier("confirExchange")DirectExchange confirExchange){ return BindingBuilder.bind(confirQueue).to(confirExchange).with("key1"); } //声明备份 Exchange @Bean("backupExchange") public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE_NAME); } //声明备份队列 @Bean("backQueue") public Queue backQueue(){ return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } //声明报警队列 @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } //声明备份队列和 备份交换机之间的绑定关系 @Bean public Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(queue).to(backupExchange); } //声明报警队列和 备份交换机之间的绑定关系 @Bean public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){ return BindingBuilder.bind(queue).to(backupExchange); }}
声明回滚类
@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{ /** * 交换机是否收到消息的一个回调方法 * * @param correlationData 消息相关数据 * @param ack 交换机是否收到消息 * @param s 失败 的原因,成功的话为null */ @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { String id = correlationData!=null?correlationData.getId():""; if (ack){ log.info("交换机已收到id为:{}",id); }else{ log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,s); } } /** * 只有当消息传递过程中不可达目的地时,将消息返回给生产者 * @param returnedMessage */ @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error("消息:{},被交换机:{}退回,退回原因:{},路由key:{}",returnedMessage.getMessage(),returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey()); }}
声明生产者
@RestController@RequestMapping("/confirm")@Slf4jpublic class ConfirmProducer { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MyCallBack myCallBack; //依赖注入rabbitMQ之后再设置它的回调对象 @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(myCallBack); rabbitTemplate.setReturnsCallback(myCallBack); } //发送消息 @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ //指定消息id为1 CorrelationData correlationData1 = new CorrelationData("1"); String routintKey = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routintKey,message+routintKey,correlationData1); log.info("发送的消息是:{}",message); //指定消息id为2 CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routintKey+123,message+routintKey,correlationData2); log.info("发送的消息是:{}",message); }}
声明确认消费者
@Component@Slf4jpublic class ConfirmCustomer { public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; @RabbitListener(queues = CONFIRM_QUEUE_NAME) public void receiveMsg(Message message){ String msg = new String(message.getBody()); log.info("接收确定队列传来的消息:{}",msg); }}
声明警告消费者
@Component@Slf4jpublic class WarningConsumer { public static final String WARNING_QUEUE_NAME = "warning.queue"; @RabbitListener(queues = WARNING_QUEUE_NAME) public void receiveWarningMsg(Message message) { String msg = new String(message.getBody()); log.error("报警发现不可路由消息:{}", msg); }}
启动测试
http://localhost:8080/confirm/sendMessage/你爱我,我爱你
转载地址:https://blog.csdn.net/qq_43612495/article/details/120783017 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年03月29日 01时43分36秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Unity中获取物体的尺寸(size)的三种方法
2019-04-27
Unity中的关节组件和绳子效果的实现
2019-04-27
Unity可视化编程插件: Bolt,可以像UE4的蓝图那样啦
2019-04-27
Android的.dex、.odex与.oat文件扫盲
2019-04-27
Unity移动应用如何在Bugly上查看崩溃堆栈
2019-04-27
一分钟搞明白Android的.so文件、ABI和CPU的关系
2019-04-27
UGUI的Text描边Outline拓展
2019-04-27
游戏性能指标参考,游戏质量白皮书下载
2019-04-27
游戏帧同步学习笔记
2019-04-27
Mac苹果电脑分辨率不够用,安装SwitchResX这个软件完美解决
2019-04-27
iOS Info.plist知多少
2019-04-27
XCode9之后命令打包需要使用OptionExport.plist
2019-04-27
关于iOS XCode的entitlements文件
2019-04-27
Airtest自动化测试神器,教你实现Unity自动化测试
2019-04-27
模拟器连接端口汇总和常用ADB命令
2019-04-27
ShaderGraph使用教程与各种特效案例:Unity2020(持续更新)
2019-04-27