【RabbitMQ-9】动态上线下线Consumer
发布日期:2022-02-14 16:09:34 浏览次数:21 分类:技术文章

本文共 19148 字,大约阅读时间需要 63 分钟。

文章目录

若实现动态的上线下线Consumer,那么就不能使用@RabbitListener方式去声明消费者。

1. 设计理念

  1. 代码中只会存在一个监听类(这个类可以为每一个queue新增一个监听对象),动态的上线下线Consumer,本质上是将Queue加监听对象中。
  2. 通过http通信来只能修改一台机器,但集群所有机器都要增加Consumer,需要使用广播模式通知到每一台机器。

2. 消费者配置

在中,项目启动时,会初始化MQ的上下文对象配置(RabbitContextHolder)。

2.1 项目启动后,读取数据库中消费者配置

从数据库中拿到消费者的配置信息(下列代码是在静态代码块中进行模拟)。增加消费者,是将消费者与对应虚拟机的队列创建监听容器。在项目启动时,开启监听。

  1. 这个类依赖RabbitContextHolder类;
  2. 采用自动ACK的方式确认消息;
@Slf4j@DependsOn("rabbitContextHolder")@Componentpublic class ConsumerInitializer {
//容器的缓存对象 private static ConcurrentHashMap
> containerMapping = new ConcurrentHashMap<>(); private static List
rabbitConsumers = new ArrayList<>(); //配置中的消费者 static {
RabbitConsumer consumer = new RabbitConsumer(); //虚拟机名 consumer.setVhost("/test"); //队列名 consumer.setQueueName("test.directQueue-1"); //开启的消费者线程 consumer.setConcurrency("2"); //抓取的数量 consumer.setPerfetch(10); //重试策略 consumer.setRetryStrategyEnum(RetryStrategyEnum.EXPONENTIAL_BACKOFF_10_TIMES); rabbitConsumers.add(consumer); } //项目启动时,动态的创建消费者 @PostConstruct void init() {
//以虚拟主机分组 Map
> consumerList = rabbitConsumers.stream(). collect(Collectors.groupingBy(RabbitConsumer::getVhost)); List
containerList = new ArrayList<>(); //遍历所有的消费者配置 consumerList.forEach((vhost, consumers) -> {
//判断内存是否缓存连接工厂(根据虚拟主机名) if (RabbitContextHolder.getConnectionFactory(vhost) == null) {
log.error("不能连接到虚拟主机[{}]", vhost); return; } //初始化SimpleMessageListenerContainer监听容器 consumers.forEach(consumer -> Optional.ofNullable(newContainer(consumer)).ifPresent(containerList::add)); //将容器加入到缓存中 containerMapping.put(vhost, containerList); }); } /** * 创建监听容器 * * @param consumer 消费者 * @return 监听容器 */ public static SimpleMessageListenerContainer newContainer(RabbitConsumer consumer) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); //监听的队列 container.setQueueNames(consumer.getQueueName()); //自动ACK container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setAmqpAdmin(RabbitContextHolder.getRabbitAdmin(consumer.getVhost())); container.setConnectionFactory(RabbitContextHolder.getConnectionFactory(consumer.getVhost())); //预取的数量 container.setPrefetchCount(consumer.getPerfetch()); //是否自动声明 队列、交换机、绑定 container.setAutoDeclare(false); //设置初始化后是否自动启动容器。 container.setAutoStartup(true); //是否设置排他性,即该消费者独享该队列 container.setExclusive(false); //消费者开启几个线程 container.setConcurrency(consumer.getConcurrency()); //消费者的监听 container.setMessageListener(new NormalListener(consumer)); //设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。 container.setDefaultRequeueRejected(false); //当监听类中不处理异常,那么异常会回调该方法。 container.setErrorHandler(e -> log.error("个性化处理:丢弃消息啦!")); //初始化这个类 container.afterPropertiesSet(); return container; } /** * 项目启动时,启动监听容器。 */ public static void startAllConsumers() {
containerMapping.values().forEach(it -> {
for (SimpleMessageListenerContainer bean : it) {
bean.start(); } }); }}

2.2 项目启动时,声明内部队列

  1. 为什么需要创建内部——广播模式的队列
  1. 若是动态的增加/减少虚拟机配置,那么需要给集群所有机器的内存新增/减少配置,即调用RabbitContextHolder#addNewVHost方法。
  2. 若是动态的上线/下线Consumer,也需要为给集群所有机器的内存增加/减少消费者容器配置。

广播模式注意点:集群中每台机器在启动时都生成一个队列(队列名由Linux机器的ip地址+随机数组合成),每台机器创建一个Consumer去监听消息。当机器重启时,消费者会下线,对应的广播队列会自动删除。

  1. 为什么需要创建内部——延迟队列

因为上面采用的是自动ACK模式,当发生异常时,会将消息放入到延迟队列中等待下次消费。

@Slf4j@DependsOn("rabbitContextHolder")@Componentpublic class ConsumerInitializer {
/** * 启动内部队列 * * @throws IOException */ public static void initializeInternalConsumers() throws IOException {
//启动内部监听(广播监听) startInternalEventConsumer(); //当消费者出现异常,开启延迟队列进行重试 startRetryEventConsumer(); } /** * 开启内部监听(广播模式) *

* 项目启动后,本机器开启(广播)消息监听。 * * @throws IOException */ private static void startInternalEventConsumer() {

//为每一台机器声明一个独一无二的队列名,当机器上消费者下线时,自动删除。 String queueName = String.format(RabbitConstants.INTERNAL_EVENT_QUEUE_TEMPLATE, getLinuxLocalIp(), getRandomNumber()); RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(RabbitConstants.INTERNAL_VHOST); Exchange exchange = ExchangeBuilder.fanoutExchange(RabbitConstants.INTERNAL_EVENT_EXCHANGE).durable(true).build(); rabbitAdmin.declareExchange(exchange); rabbitAdmin.declareQueue(new Queue(queueName, true, false, true)); //广播模式的路由键为"" rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, RabbitConstants.INTERNAL_EVENT_EXCHANGE, "", null)); //创建内部队列 SimpleMessageListenerContainer simpleMessageListenerContainer = newInternalContainer(queueName, "1", ListenerType.NORMAL); simpleMessageListenerContainer.start(); } /** * 创建延迟队列 */ public static void startRetryEventConsumer() {
RabbitAdmin rabbitAdmin = RabbitContextHolder.getRabbitAdmin(RabbitConstants.INTERNAL_VHOST); Exchange directExchange = ExchangeBuilder.directExchange(RabbitConstants.INTERNAL_DELAYED_EXCHANGE).delayed().durable(true).build(); rabbitAdmin.declareExchange(directExchange); rabbitAdmin.declareQueue(new Queue(RabbitConstants.INTERNAL_RETRY_QUEUE, true, false, false)); rabbitAdmin.declareBinding(new Binding(RabbitConstants.INTERNAL_RETRY_QUEUE, Binding.DestinationType.QUEUE, RabbitConstants.INTERNAL_DELAYED_EXCHANGE, RabbitConstants.INTERNAL_RETRY_ROUTING_KEY, null)); //3-10表示初始化开启3个线程,当负载增加时,线程数缓慢得增加到10个线程。 SimpleMessageListenerContainer simpleMessageListenerContainer = newInternalContainer(RabbitConstants.INTERNAL_RETRY_QUEUE, "3-10", ListenerType.RETRY); simpleMessageListenerContainer.start(); } /** * 创建内部监听容器(广播模式&&延迟队列) * * @param queueNames 队列名 * @param concurrency 并发数 * @return */ public static SimpleMessageListenerContainer newInternalContainer(String queueNames, String concurrency, ListenerType listenerType) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setQueueNames(queueNames); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setAmqpAdmin(RabbitContextHolder.getRabbitAdmin(RabbitConstants.INTERNAL_VHOST)); container.setConnectionFactory(RabbitContextHolder.getConnectionFactory(RabbitConstants.INTERNAL_VHOST)); container.setAutoStartup(true); container.setAutoDeclare(false); container.setExclusive(false); //该参数默认是250,按照参数修改。 container.setPrefetchCount(1); //3-10 默认是3个,若是负载上来的话,会缓慢增长到10个。 container.setConcurrency(concurrency); if (ListenerType.NORMAL.equals(listenerType)) {
container.setMessageListener(new InternalEventMessageListener()); } else {
container.setMessageListener(new RetryMessageListener()); } return container; } /** * 获取到每台服务器的Ip地址,组装队列名 * * @return */ private static String getLinuxLocalIp() {
String ip = ""; try {
for (Enumeration
en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) {
NetworkInterface intf = en.nextElement(); String name = intf.getName(); if (!name.contains("docker") && !name.contains("lo")) {
for (Enumeration
enumIpAddr = intf.getInetAddresses(); enumIpAddr.hasMoreElements(); ) {
InetAddress inetAddress = enumIpAddr.nextElement(); if (!inetAddress.isLoopbackAddress()) {
String ipaddress = inetAddress.getHostAddress(); if (!ipaddress.contains("::") && !ipaddress.contains("0:0:") && !ipaddress.contains("fe80")) {
ip = ipaddress; } } } } } } catch (SocketException ex) {
log.error("获取ip异常"); ip = "127.0.0.1"; ex.printStackTrace(); } log.info("Linux的IP为:" + ip); return ip; } public static int getRandomNumber() {
return (int) ((Math.random() * 9 + 1) * 100000); }}

2.3 项目运行时,动态的新增/减少Consumer配置

@Slf4j@DependsOn("rabbitContextHolder")@Componentpublic class ConsumerInitializer {
//容器的缓存对象 private static ConcurrentHashMap
> containerMapping = new ConcurrentHashMap<>(); /** * 挂起消费者 *

* 将一个队列的所有消费者全部挂起。 * * @param vhost 虚拟主机 * @param queueName 队列名 */ public static void blockConsumer(String vhost, String queueName) {

//消费者的集合 Optional
opt = ConsumerInitializer.getContainerMapping(vhost). stream(). filter(c -> Arrays.asList(c.getQueueNames()).contains(queueName)). findAny(); opt.ifPresent(c -> {
//停止消费者 c.stop(); //在缓存中找到容器信息并删除 ConsumerInitializer.getContainerMapping(vhost).remove(c); }); } /** * 开启消费者 * * @param consumer */ public static void openConsumer(RabbitConsumer consumer) {
//为每台机器new出现一个监听容器 SimpleMessageListenerContainer simpleMessageListenerContainer = ConsumerInitializer.newContainer(consumer); if (simpleMessageListenerContainer != null) {
//放入缓存中 ConsumerInitializer.getContainerMapping(consumer.getVhost()).add(simpleMessageListenerContainer); //开启消费者 simpleMessageListenerContainer.start(); } }}

实体类,消费者信息:

@Datapublic class RabbitConsumer {
/** * 虚拟主机 */ private String vhost; /** * 并发数 */ private String concurrency; /** * 一次性获取消息数目 * limit 1-999 */ private Integer perfetch; /** * 监听的队列名 */ private String queueName; /** * 回调的API地址 */ private String bizApi; private RetryStrategyEnum retryStrategyEnum; /** * API 返回成功值 */ private String bizSuccessCode; /** * API 超时时间 */ private Integer timeout; }

3. 监听类配置

3.1 应用队列的监听

该项目中只存在一个监听类,每一个队列都会new一个监听对象来处理。当收到监听信息后,会在RabbitConsumer找到对API地址,通过http回调应用服务器方法。进行消息的处理,当发生异常后,会根据重试策略,将消息重新丢弃或放入到延迟队列(注意的是,若消息是广播模式的那么不会进行重试)。

@Slf4j@Datapublic class NormalListener implements ChannelAwareMessageListener {
private RabbitConsumer consumer; public NormalListener() {
} public NormalListener(RabbitConsumer consumer) {
this.consumer = consumer; } @Override public void onMessage(Message message, Channel channel) {
MessageHandler messageHandler = SpringUtil.getBean(MessageHandler.class); //处理消息 messageHandler.normalProcess(message,consumer); }}

实现消息异常的重试——放入到延迟队列中。

import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import java.util.Map;import java.util.Optional;/** * @author by yexuerui@xdf.cn * @Date 2020-08-05 16:01 */@Component@Slf4jpublic class MessageHandler {
/** * 消息处理 核心流程 * * @param consumer 消费者 * @return 返回值为处理成功或失败 */ public boolean normalProcess(Message message, RabbitConsumer consumer) {
String messageId = message.getMessageProperties().getMessageId(); try {
String url = consumer.getBizApi(); log.info("回调业务接口处理消息,url为{}!", url); //获取到配置的urel Thread.sleep(2000000); return true; } catch (Exception e) {
log.error("An error occurs during process an message in PigeonMessageListener, MessageId : {}", messageId, e); dueException(consumer, message); return false; } } /** * error * * @param consumer * @param message */ private void dueException(RabbitConsumer consumer, Message message) {
Map
headers = message.getMessageProperties().getHeaders(); String vhost = (String) headers.get("vhost"); String exchange = (String) headers.get("exchange"); if (!this.wasFanout(vhost, exchange)) {
message.getMessageProperties().setHeader("consumer", JSONObject.toJSONString(consumer)); RetryStrategyEnum retryStrategy = Optional.ofNullable(consumer.getRetryStrategyEnum()).orElse(RetryStrategyEnum.NONE); retryPolicy(message, retryStrategy); } } /** * 验证exchange是否为FANOUT类型 * * @param vhost vhost * @param exchange exchange * @return exchange type is fouout */ private boolean wasFanout(String vhost, String exchange) {
return false; } /** * 重试策略 * * @param message * @param retryStrategy */ private void retryPolicy(Message message, RetryStrategyEnum retryStrategy) {
Map
headers = message.getMessageProperties().getHeaders(); Integer retryTimes = (Integer) headers.getOrDefault("retry_times", 0); switch (retryStrategy) {
case NONE: log.info("不进行重试"); break; case IMMEDIATELY_THREE_TIMES: if (retryTimes < 3) {
log.info("固定时间重试"); //放入到延迟队列中,等待下次调用 sendToRetry(message, retryStrategy); } else {
log.info("固定时间重试——没有重试机会!"); } break; case EXPONENTIAL_BACKOFF_10_TIMES: if (retryTimes < 1) {
log.info("阶梯时间重试"); sendToRetry(message, retryStrategy); } else {
log.info("阶梯时间重试——没有重试机会!"); } break; default: break; } } public void sendToRetry(Message message, RetryStrategyEnum retryStrategy) {
Map
headers = message.getMessageProperties().getHeaders(); //获取到重试的次数 Integer retryTimes = (Integer) headers.getOrDefault("retry_times", 0); int delayMillions; if (RetryStrategyEnum.IMMEDIATELY_THREE_TIMES.equals(retryStrategy)) {
delayMillions = 1000; } else if (RetryStrategyEnum.EXPONENTIAL_BACKOFF_10_TIMES.equals(retryStrategy)) {
//指数增加延迟时间 delayMillions = (2 << retryTimes) * 1000; } else {
log.error("不进行重试!"); return; } //设置重试次数 message.getMessageProperties().setHeader("retry_times", retryTimes + 1); //设置延迟行 log.error("延迟时间:[{}]", delayMillions); message.getMessageProperties().setDelay(delayMillions); //发送消息到延迟队列 RabbitTemplate rabbitTemplate = RabbitContextHolder.getRabbitTemplate(RabbitConstants.INTERNAL_VHOST); rabbitTemplate.convertAndSend( RabbitConstants.INTERNAL_DELAYED_EXCHANGE, RabbitConstants.INTERNAL_RETRY_ROUTING_KEY, message ); }}

3.2 延迟队列的监听

当延迟队列收到消息后,会继续对消息进行消费,若消费失败,根据重试策略,对消息选择丢弃或继续放入延迟队列等待处理。

public class RetryMessageListener implements MessageListener {
@Override public void onMessage(Message message) {
Map
headers = message.getMessageProperties().getHeaders(); String consumerStr = (String)headers.get("consumer"); RabbitConsumer consumer = JSONObject.parseObject(consumerStr, RabbitConsumer.class); //设置延迟的时间 MessageHandler messageHandler = SpringUtil.getBean(MessageHandler.class); messageHandler.normalProcess(message,consumer); }}

3.3 广播模式队列监听

广播模式的事件类型分为两类:

  1. 在内存中新增/减少消费者;
  2. 在内存中新增/减少RabbitMQ的配置信息(RabbitAdmin、RabbitTemplate等)
@Slf4jpublic class InternalEventMessageListener implements MessageListener {
@Override public void onMessage(Message message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8); InternalEvent event = JSONObject.parseObject(body, InternalEvent.class); log.info("内部监听到信息[{}]", JSONObject.toJSONString(event)); //下掉一个队列的所有消费者 if (InternalEvent.EventType.CONSUMER.equals(event.getEventType())) {
if (InternalEvent.Operation.DISABLE == event.getOperation()) {
log.info("下掉一个队列上的所有消费者..."); ConsumerInitializer.blockConsumer("/test", "test.directQueue-1"); } else {
//为一个队列增加消费者 RabbitConsumer consumer = new RabbitConsumer(); consumer.setVhost("/test"); consumer.setQueueName("test.directQueue-1"); consumer.setConcurrency("1"); consumer.setPerfetch(1); ConsumerInitializer.openConsumer(consumer); } } else {
//增加虚拟机配置 if (InternalEvent.Operation.DISABLE.equals(event.getOperation())) {
//广播通知,为内存中的map增加数据 RabbitContextHolder.delVHost("/test"); } else {
//广播通知,为内存中的map增加数据 RabbitVirtualHost rabbitVirtualHost = new RabbitVirtualHost(); rabbitVirtualHost.setHost("localhost"); rabbitVirtualHost.setPort(5672); rabbitVirtualHost.setUsername("guest"); rabbitVirtualHost.setPassword("guest"); rabbitVirtualHost.setVhost("/test"); //为内存中增加数据 RabbitContextHolder.addNewVHost(rabbitVirtualHost); } } }}

事件类配置

@Data@ToString@NoArgsConstructor@AllArgsConstructorpublic class InternalEvent {
@NotNull private EventType eventType; @NotNull private Operation operation; /** * 通过id去数据库那到虚拟机或者消费者的配置 */ @NotNull private Integer id; public enum EventType {
VHOST, CONSUMER } public enum Operation {
ENABLE, DISABLE }}

4. 项目启动时,初始化内部队列

SpringBoot项目启动时,初始化方法加载顺序可以参考。

@Slf4j@Componentpublic class ApplicationStartedListener implements ApplicationListener
{
@Autowired ApplicationContext applicationContext; @Override public void onApplicationEvent(ApplicationReadyEvent event) {
try {
ConsumerInitializer.startAllConsumers(); log.info("启动消息中心所有的消费者!"); ConsumerInitializer.initializeInternalConsumers(); log.info("启动 内部 事件监听消费者 && 内容重试消费者"); } catch (Exception e) {
log.error("", e); SpringApplication.exit(applicationContext); } }}

转载地址:https://blog.csdn.net/qq_29595463/article/details/107906476 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:【RabbitMQ-8】SpringBoot2.x动态的创建Queue、Exchange、VirtualHost、Binding
下一篇:【SpringBoot2.x-3】使用Redis的bitmap实现布隆过滤器(Guava中BF算法)

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2024年04月06日 09时35分04秒