【RabbitMQ-8】SpringBoot2.x动态的创建Queue、Exchange、VirtualHost、Binding
发布日期:2022-02-14 16:09:34
浏览次数:25
分类:技术文章
本文共 16799 字,大约阅读时间需要 55 分钟。
文章目录
1 项目启动时,初始化MQ配置
因为项目中可能存在多个MQ的连接,所以舍弃了yaml配置MQ的做法,而是在JAVA代码中声明CachingConnectionFactory
连接工厂,去配置RabbitAdmin
和RabbitTemplate
。
import com.tellme.entity.RabbitVirtualHost;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.Connection;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ConcurrentHashMap;/** * 初始化——构造方法! * 初始化——@PostConstruct方法 * 初始化——InitializingBean接口 * 初始化——init方法! * ---容器启动完毕后... * 容器启动—CommandLineRunner接口方法! */@Slf4j@Componentpublic class RabbitContextHolder { private static ConcurrentHashMapvhostMapping = new ConcurrentHashMap<>(); private static ConcurrentHashMap rabbitConnectionMapping = new ConcurrentHashMap<>(); private static ConcurrentHashMap adminMapping = new ConcurrentHashMap<>(); private static ConcurrentHashMap templateMapping = new ConcurrentHashMap<>(); private static List rabbitVirtualHosts = new ArrayList<>(); /** * 初始化的连接配置。 */ static { RabbitVirtualHost rabbitVirtualHost = new RabbitVirtualHost(); rabbitVirtualHost.setHost("localhost"); rabbitVirtualHost.setPort(5672); rabbitVirtualHost.setUsername("guest"); rabbitVirtualHost.setPassword("guest"); rabbitVirtualHost.setVhost("/test"); RabbitVirtualHost r2 = new RabbitVirtualHost(); r2.setHost("localhost"); r2.setPort(5672); r2.setUsername("guest"); r2.setPassword("guest"); r2.setVhost("pigeon"); rabbitVirtualHosts.add(rabbitVirtualHost); rabbitVirtualHosts.add(r2); } /** * 项目启动时,会初始化队列。 */ @PostConstruct void initContainer() { //读取多个RabbitMq的配置,多个虚拟主机。 rabbitVirtualHosts.forEach(RabbitContextHolder::addNewVHost); } /** * 根据MQ连接信息去初始化 * {@link CachingConnectionFactory} * {@link RabbitAdmin} * {@link RabbitTemplate} * 配置并放入到内存中。 * * @param conn RabbitMq的连接信息 */ public static void addNewVHost(RabbitVirtualHost conn) { //创建连接工厂 CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost(conn.getHost()); factory.setPort(conn.getPort()); factory.setVirtualHost(conn.getVhost()); factory.setUsername(conn.getUsername()); factory.setPassword(conn.getPassword()); //保证生产者不丢消息 factory.setPublisherReturns(true); factory.setPublisherConfirms(true); try { Connection connection = factory.createConnection(); if (!connection.isOpen()) { log.error("Rabbit的连接工厂创建失败,虚拟主机为[{}]", conn.getVhost()); } RabbitAdmin rabbitAdmin = new RabbitAdmin(factory); rabbitAdmin.setAutoStartup(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(factory); //统一对异常的处理 rabbitTemplate.setReplyErrorHandler(t -> { log.error("进行重试了下哈~"); }); //交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式 //true:RabbitMQ会调用Basic.Return命令将消息返回给生产者 //false:RabbitMQ会把消息直接丢弃 rabbitTemplate.setMandatory(true); //放入到各个缓存中 rabbitConnectionMapping.put(conn.getVhost(), factory); adminMapping.put(conn.getVhost(), rabbitAdmin); templateMapping.put(conn.getVhost(), rabbitTemplate); vhostMapping.put(conn.getVhost(), conn); } catch (Exception e) { log.error("初始化connection factory 失败,虚拟主机为[{}]", conn.getVhost(), e); } } /** * 移除内存中的配置信息 * * @param vhost 虚拟机对象 */ public static void delVHost(String vhost) { rabbitConnectionMapping.remove(vhost); adminMapping.remove(vhost); templateMapping.remove(vhost); vhostMapping.remove(vhost); } public static CachingConnectionFactory getConnectionFactory(String vhost) { return rabbitConnectionMapping.get(vhost); } public static RabbitAdmin getRabbitAdmin(String vhost) { return adminMapping.get(vhost); } public static RabbitTemplate getRabbitTemplate(String vhost) { return templateMapping.get(vhost); } public static RabbitVirtualHost getVHost(String vhost) { return vhostMapping.get(vhost); }}
实体类配置:
@Datapublic class RabbitVirtualHost { /** * Mq的Ip地址 */ String host; /** * Mq的端点 */ Integer port; /** * 虚拟主机 */ String vhost; /** * Mq的用户名 */ String username; /** * Mq的密码 */ String password;}
2 使用RabbitAdmin动态创建
使用
RabbitAdmin
去动态的创建MQ的相关组件,对应工具类如下所示:
@Slf4jpublic abstract class RabbitHandlerUtils { private final static String DELAYED_TYPE = "x-delayed-type"; private final static String DELAYED_MESSAGE = "x-delayed-message"; /** * 创建交换机 * * @param rabbitExchange 创建交换机的对象 * @return 交换机 */ public static Exchange createExchange(RabbitExchange rabbitExchange) { RabbitAdmin admin = RabbitContextHolder.getRabbitAdmin(rabbitExchange.getVhost()); Exchange exchange = initExchange(rabbitExchange); admin.declareExchange(exchange); return exchange; } /** * 移除交换机 * * @param vhostName 虚拟主机名 * @param exchangeName 交换机的名称 * @return */ public static boolean deleteExchange(String vhostName, String exchangeName) { RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(vhostName); return rabbitAdmin.deleteExchange(exchangeName); } /** * 创建队列 * * @param rabbitQueue * @return */ public static Queue createQueue(RabbitQueue rabbitQueue) { RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(rabbitQueue.getVhost()); String queueName = rabbitQueue.getName(); Queue queue = new Queue(queueName); if (queueExist(queueName, rabbitAdmin)) { throw new RuntimeException("The queue " + rabbitQueue.toString() + " 已经存在。"); } BeanUtils.copyProperties(rabbitQueue, queue); rabbitAdmin.declareQueue(queue); return queue; } /** * 移除队列 * * @param vhostName 虚拟主机名 * @param queueName 队列名 */ public static void deleteQueue(String vhostName, String queueName) { RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(vhostName); rabbitAdmin.deleteQueue(queueName); } /** * 创建绑定关系 */ public static void bind(RabbitBinding binding) { RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(binding.getVhost()); Binding b = new Binding(binding.getQueue(), Binding.DestinationType.QUEUE, binding.getExchange(), binding.getRoutingKey(), binding.getArguments()); rabbitAdmin.declareBinding(b); } /** * 解绑操作 */ public static void unbind(RabbitBinding binding) { RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(binding.getVhost()); Binding b = new Binding(binding.getQueue(), Binding.DestinationType.QUEUE, binding.getExchange(), binding.getRoutingKey(), binding.getArguments()); rabbitAdmin.removeBinding(b); } /** * 清空队列 * * @param vhostName 虚拟主机名 * @param queueName 队列名 * @param noWait 是否等待,true是异步清空,false是同步清空 */ public static void purgeQueue(String vhostName, String queueName, boolean noWait) { RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(vhostName); rabbitAdmin.purgeQueue(queueName, noWait); } /** * 获取到消息的数量 * * @param vhostName 虚拟主机名 * @param queueName 队列名 * @return 队列中消息的数量 */ public static int getMessageCount(String vhostName, String queueName) { RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(vhostName); if (isEmpty(queueName)) { throw new RuntimeException("Queue name can not be null"); } Integer messageCount = rabbitAdmin.getRabbitTemplate().execute(channel -> { try { AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive(queueName); return declareOk.getMessageCount(); } catch (Exception e) { log.error("获取队列消息识别[{}]", queueName, e); return -1; } }); return messageCount == null ? 0 : messageCount; } /** * 判断队列是否存在 * * @param queueName 队列名 * @param rabbitAdmin 某连接配置下的{@link RabbitAdmin} * @return true表示存在,false表示不存在 */ private static boolean queueExist(String queueName, RabbitAdmin rabbitAdmin) { String name = rabbitAdmin.getRabbitTemplate().execute(channel -> { try { //若找不到,直接会抛出404的错误 AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive(queueName); return declareOk.getQueue(); } catch (Exception e) { log.error("查询异常", e); return null; } }); return StringUtils.isNotBlank(name); } /** * 调用Mq创建虚拟主机 * * @param vhost 虚拟主机配置 * @throws IOException */ public static void createVHost(RabbitVirtualHost vhost) throws IOException { HttpClient client = HttpClients.createDefault(); HttpPut httpPut = new HttpPut(); String authTemplate = "%s:%s"; String authString = String.format(authTemplate, vhost.getUsername(), vhost.getPassword()); String encoding = DatatypeConverter.printBase64Binary(authString.getBytes(StandardCharsets.UTF_8)); httpPut.setHeader("content-type", ContentType.APPLICATION_JSON.toString()); httpPut.setHeader("Authorization", "Basic " + encoding); String hostTemplate = "http://%s:%d"; String apiTemplate = "http://%s:%d/api/vhosts/%s"; String host = String.format(hostTemplate, vhost.getHost(), 15672); String api = String.format(apiTemplate, vhost.getHost(), 15672, vhost.getVhost()); httpPut.setURI(URI.create(api)); HttpResponse response = client.execute(HttpHost.create(host), httpPut); log.info("创建Rabbit虚拟主机的配置 : " + api + " : " + response); Assert.assertTrue(response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 || response.getStatusLine().getStatusCode() == 204); } /** * 初始化交换机 */ private static Exchange initExchange(RabbitExchange rabbitExchange) { //判断是否是延迟队列 if (rabbitExchange.isDelayed()) { //定义延迟队列 Maparguments = new HashMap<>(); arguments.put(DELAYED_TYPE, rabbitExchange.getType().name().toLowerCase()); return new CustomExchange(rabbitExchange.getName(), DELAYED_MESSAGE, rabbitExchange.isDurable(), rabbitExchange.isAutoDelete(), arguments); } switch (rabbitExchange.getType()) { case DIRECT: //直连模式 return new DirectExchange(rabbitExchange.getName(), //交换机是否持久化 rabbitExchange.isDurable(), //当所有的绑定关系被删除时,自动删除队列 rabbitExchange.isAutoDelete(), //交换器的其他参数,可以为空 rabbitExchange.getArguments()); case TOPIC: //通配符模式 return new TopicExchange(rabbitExchange.getName(), rabbitExchange.isDurable(), rabbitExchange.isAutoDelete(), rabbitExchange.getArguments()); case FANOUT: //广播模式 return new FanoutExchange(rabbitExchange.getName(), rabbitExchange.isDurable(), rabbitExchange.isAutoDelete(), rabbitExchange.getArguments()); case HEADER: //该类型不常见 return new HeadersExchange(rabbitExchange.getName(), rabbitExchange.isDurable() , rabbitExchange.isAutoDelete(), rabbitExchange.getArguments()); default: return null; } }}
需要引入的依赖:
创建虚拟主机时,和MQ进行远程通信。因为借助了httpClient
所以需要引入对应依赖。 org.apache.httpcomponents httpclient 4.5.12
工具类使用的对应的实体类如下所示:
队列实体类:
@Datapublic class RabbitQueue { /** * 队列名 */ String name; /** * 虚拟主机名 */ String vhost; /** * 是否持久化队列 */ boolean durable; /** * true:队列上没有consumer时,自动删除队列 */ boolean autoDelete; Maparguments;}
交换集枚举对象:
public enum RabbitExchangeTypeEnum { /** * 直连模式 */ DIRECT, /** * 通配符模式 */ TOPIC, /** * 广播模式 */ FANOUT, HEADER}
交换机的实体类配置:
@Datapublic class RabbitExchange { /** * 交换机名 */ String name; /** * 虚拟主机名 */ String vhost; /** * 交换机类型 */ RabbitExchangeTypeEnum type; /** * 是否延迟交换机 */ boolean delayed; /** * 是否持久化 */ boolean durable = true; /** * true:没有队列时,自动删除交换机 */ boolean autoDelete; /** * 其他参数 */ Maparguments;}
虚拟主机的实体配置:
@Datapublic class RabbitVirtualHost { /** * Mq的Ip地址 */ String host; /** * Mq的端点 */ Integer port; /** * 虚拟主机 */ String vhost; /** * Mq的用户名 */ String username; /** * Mq的密码 */ String password;}
绑定关系的实体配置:
@Datapublic class RabbitBinding { /** * 虚拟主机名 */ String vhost; /** * 连接键名 */ String routingKey; /** * 交换机名 */ String exchange; /** * 队列名 */ String queue; /** * 配置 */ Maparguments; }
3 测试类
通过http请求便可以动态的去创建MQ组件,测试代码如下:
@RestControllerpublic class RabbitMQController { @Autowired ProducerService producerService; private int i; //直接向队列中发送数据 @GetMapping("send") public String send() { RabbitMessage message = new RabbitMessage(); message.setVhost("/test"); message.setBody(String.format("send message %s", i)); message.setRoutingKey("test.directRoutingKey-1"); message.setExchange("test.directExchange"); producerService.send(message); return "success"; } @PostMapping("/exchange") public Exchange createExchange() { RabbitExchange rabbitExchange = new RabbitExchange(); rabbitExchange.setName("test.directExchange.delayed"); rabbitExchange.setAutoDelete(false); rabbitExchange.setDurable(true); rabbitExchange.setVhost("/test"); //注册的是延迟队列 rabbitExchange.setDelayed(true); rabbitExchange.setType(RabbitExchangeTypeEnum.DIRECT); return RabbitHandlerUtils.createExchange(rabbitExchange); } @PostMapping("/queue") public Queue createQueue() { RabbitQueue rabbitQueue = new RabbitQueue(); rabbitQueue.setVhost("/test"); rabbitQueue.setName("test.directQueue-1"); rabbitQueue.setDurable(true); return RabbitHandlerUtils.createQueue(rabbitQueue); } @DeleteMapping("/queue") public void deleteQueue() { RabbitHandlerUtils.deleteQueue("/test", "test.directQueue-1"); } @PostMapping("/binding") public void createBinding() { RabbitBinding rabbitBinding = new RabbitBinding(); rabbitBinding.setVhost("/test"); rabbitBinding.setQueue("test.directQueue.delayed-1"); rabbitBinding.setExchange("test.directExchange.delayed"); rabbitBinding.setRoutingKey("test.directRoutingKey-delayed-1"); RabbitHandlerUtils.bind(rabbitBinding); } @GetMapping("/count") public int getMessageCount() { return RabbitHandlerUtils.getMessageCount("/test", "test.directQueue-1"); } @GetMapping("vhost") public void createVHost() { RabbitVirtualHost rabbitVirtualHost = new RabbitVirtualHost(); rabbitVirtualHost.setHost("localhost"); rabbitVirtualHost.setPort(5672); rabbitVirtualHost.setUsername("guest"); rabbitVirtualHost.setPassword("guest"); rabbitVirtualHost.setVhost("/test-2"); try { RabbitHandlerUtils.createVHost(rabbitVirtualHost); } catch (IOException e) { e.printStackTrace(); } }}
转载地址:https://blog.csdn.net/qq_29595463/article/details/107865855 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
很好
[***.229.124.182]2024年04月07日 12时03分17秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
蓝桥杯 - [2013年第四届真题]买不到的数目(数论|动态规划)
2019-04-28
蓝桥杯 - [历届试题]国王的烦恼(并查集)
2019-04-28
蓝桥杯 - [2013年第四届真题]打印十字图(规律)
2019-04-28
蓝桥杯 - [历届试题]翻硬币(贪心)
2019-04-28
蓝桥杯 - [历届试题]连号区间数(暴力)
2019-04-28
蓝桥杯 - [算法提高VIP]分苹果(差分数组)
2019-04-28
蓝桥杯 - [算法提高VIP]最小乘积(贪心)
2019-04-28
蓝桥杯 - [基础练习VIP]数的读法(模拟)
2019-04-28
蓝桥杯 - [基础练习VIP]芯片测试
2019-04-28
蓝桥杯 - [基础练习VIP]龟兔赛跑预测(模拟)
2019-04-28
蓝桥杯 - [2014年第五届真题]分糖果(模拟)
2019-04-28
蓝桥杯 - [2013年第四届真题]剪格子(DFS)
2019-04-28
蓝桥杯 - [历届试题]城市建设(最小生成树)
2019-04-28
蓝桥杯 - [2013年第四届真题]大臣的旅费(DFS)
2019-04-28
蓝桥杯 - [2013年第四届真题]带分数(全排列)
2019-04-28
蓝桥杯 - [2013年第四届真题]幸运数(模拟)
2019-04-28
蓝桥杯 - [2013年第四届真题]横向打印二叉树(排序二叉树)
2019-04-28
蓝桥杯 - [历届试题]网络寻路(枚举)
2019-04-28
牛客网 - [中南林业科技大学第十一届程序设计大赛]兑换零钱(背包问题)
2019-04-28
HDU - Robberies(01背包)
2019-04-28