【RabbitMQ-9】@RabbitListener注解生效的源码分析
发布日期:2022-02-14 16:09:36 浏览次数:28 分类:技术文章

本文共 13253 字,大约阅读时间需要 44 分钟。

文章目录

为何配置
@RabbitListener注解,对应的方法便可以消费MQ的消息?

核心思想:

  1. 读取注解的配置;
  2. 根据配置去监听queue的信息(即创建消费者线程)。

1. 读取注解配置

读取注解配置时机:创建bean对象,执行BeanPostProcessorpostProcessAfterInitialization方法时,将bean对象及其方法的注解配置读取到缓存中。

//bean执行BeanPostProcessor的方法public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName) throws BeansException {    Object result = existingBean;    for (BeanPostProcessor processor: getBeanPostProcessors()) {        Object current = processor.postProcessAfterInitialization(result, beanName);        if (current == null) {            return result;        }        result = current;    }    return result;}

会被org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor处理,解析注解配置。

RabbitListenerAnnotationBeanPostProcessor类被@EnableRabbit注解加入到本项目的Spring容器中,所以若想MQ的注解生效,项目启动类需要使用@EnableRabbit注解。

注:RabbitListenerAnnotationBeanPostProcessor处理后并不会生成代理对象,这个处理器仅仅是解析注解。

1.1 后置处理器的核心方法

public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {    //获取到目标对象(作为Map缓存的key)    Class < ?>targetClass = AopUtils.getTargetClass(bean);    //在Map中若取不到,那么执行buildMetadata()方法生成。    final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this: :buildMetadata);    //第一层是获取到ListenerMethod对象(见下文)    for (ListenerMethod lm: metadata.listenerMethods) {        //一个方法上可能有多个注解,于是循环多个注解配置。        for (RabbitListener rabbitListener: lm.annotations) {            processAmqpListener(rabbitListener, lm.method, bean, beanName);        }    }    if (metadata.handlerMethods.length > 0) {        processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);    }    return bean;}

1.2 读取注解信息

private TypeMetadata buildMetadata(Class 
targetClass) { //读取目标类的注解 Collection
classLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List
methods = new ArrayList <>(); final List
multiMethods = new ArrayList <>(); //遍历该类上满足USER_DECLARED_METHODS条件的方法(用户定义的方法) ReflectionUtils.doWithMethods(targetClass, method - >{ //解析方法上的注解 Collection
listenerAnnotations = findListenerAnnotations(method); if (listenerAnnotations.size() > 0) { //放入到List集合中 methods.add(new ListenerMethod(method, listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()]))); } //若是类上有@RabbitListener注解,那么取解析@RabbitHandler注解 if (hasClassLevelListeners) { RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class); if (rabbitHandler != null) { multiMethods.add(method); } } }, ReflectionUtils.USER_DECLARED_METHODS); if (methods.isEmpty() && multiMethods.isEmpty()) { return TypeMetadata.EMPTY; } //返回对象 return new TypeMetadata(methods.toArray(new ListenerMethod[methods.size()]), multiMethods.toArray(new Method[multiMethods.size()]), classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));

而TypeMetadata实际上是多个集合的对象

private static class TypeMetadata {    //方法上带有@RabbitListener    final ListenerMethod[] listenerMethods; // NOSONAR    //方法上带有@RabbitHandler    final Method[] handlerMethods; // NOSONAR    //类上带有@RabbitListener    final RabbitListener[] classAnnotations; // NOSONAR    static final TypeMetadata EMPTY = new TypeMetadata();    ....}

而ListenerMethod对象就是保存了Method对象和上面的注解配置。

private static class ListenerMethod {    final Method method; // NOSONAR    final RabbitListener[] annotations; // NOSONAR    ...}

2. 创建消费者线程

2.1 创建MethodRabbitListenerEndpoint

MethodRabbitListenerEndpoint保存了方法信息和注解配置信息的对象(可以看做临时对象)

protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {    //检查是否是JDK代理,若是JDK代理是否实现接口。    Method methodToUse = checkProxy(method, bean);    //创建端点对象    MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();    //填充方法对象    endpoint.setMethod(methodToUse);    //填充端点对象    processListener(endpoint, rabbitListener, bean, methodToUse, beanName);}

注意,registrar在属性上new RabbitListenerEndpointRegistrar()创建的。

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object adminTarget, String beanName) {    endpoint.setBean(bean);    ...//填充配置的set方法    resolveAdmin(endpoint, rabbitListener, adminTarget);    RabbitListenerContainerFactory < ?>factory = resolveContainerFactory(rabbitListener, adminTarget, beanName);    //端点信息进行注册    this.registrar.registerEndpoint(endpoint, factory);}

2.2 端点的注册

对应的类RabbitListenerEndpointRegistrar

public void registerEndpoint(RabbitListenerEndpoint endpoint, @Nullable RabbitListenerContainerFactory < ?>factory) {    Assert.notNull(endpoint, "Endpoint must be set");    Assert.hasText(endpoint.getId(), "Endpoint id must be set");    Assert.state(!this.startImmediately || this.endpointRegistry != null, "No registry available");    // Factory may be null, we defer the resolution right before actually creating the container    AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);    synchronized(this.endpointDescriptors) {        //是否注册的时候立即启动?        if (this.startImmediately) { // Register and start immediately            this.endpointRegistry.registerListenerContainer(descriptor.endpoint, // NOSONAR never null            resolveContainerFactory(descriptor), true);        } else {            //不是立即启动,那么放入到List中            this.endpointDescriptors.add(descriptor);        }    }}

注意:RabbitListenerEndpointRegistrar实现了InitializingBean接口,在bean创建中会执行回调方法afterPropertiesSet()

上面说到,注册时因为不是立即启动,将descriptor存放到了List中,而是回调方法中统一进行处理。

@Override public void afterPropertiesSet() {    registerAllEndpoints();}//注册所有端点protected void registerAllEndpoints() {    Assert.state(this.endpointRegistry != null, "No registry available");    synchronized(this.endpointDescriptors) {        for (AmqpListenerEndpointDescriptor descriptor: this.endpointDescriptors) {            //创建监听容器            this.endpointRegistry.registerListenerContainer( // NOSONAR never null            descriptor.endpoint, resolveContainerFactory(descriptor));        }        this.startImmediately = true; // trigger immediate startup    }}

2.3 监听容器的创建

对象信息:RabbitListenerEndpointRegistry

public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory < ?>factory) {    //立即启动为false    registerListenerContainer(endpoint, factory, false);}
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory < ?>factory, boolean startImmediately) {    Assert.notNull(endpoint, "Endpoint must not be null");    Assert.notNull(factory, "Factory must not be null");    String id = endpoint.getId();    Assert.hasText(id, "Endpoint id must not be empty");    synchronized(this.listenerContainers) {        Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'");        //核心方法。创建容器        MessageListenerContainer container = createListenerContainer(endpoint, factory);       //核心操作:将容器放入到List中        this.listenerContainers.put(id, container);       ...//根据groupId分组,不关心         if (startImmediately) { //默认false,不关心            startIfNecessary(container);        }    }}

上面说到MethodRabbitListenerEndpoint可以看做临时对象(临时存储对象信息和注解配置信息)。目的就是创建监听对象,注意监听对象最终也是放入了list对象中。

RabbitMQ使用默认的SimpleRabbitListenerContainerFactory监听工厂。

AbstractRabbitListenerContainerFactory实现的方法:

将endpoint的配置信息存入SimpleMessageListenerContainer对象中。

public C createListenerContainer(RabbitListenerEndpoint endpoint) {    C instance = createContainerInstance();    JavaUtils javaUtils = JavaUtils.INSTANCE.acceptIfNotNull(this.connectionFactory, instance: :setConnectionFactory).acceptIfNotNull(this.errorHandler, instance: :setErrorHandler);    if (this.messageConverter != null) {        if (endpoint != null) {            endpoint.setMessageConverter(this.messageConverter);            if (endpoint.getMessageConverter() == null) {                instance.setMessageConverter(this.messageConverter);            }        } else {            instance.setMessageConverter(this.messageConverter);        }    }    javaUtils.acceptIfNotNull(this.acknowledgeMode, instance: :setAcknowledgeMode).acceptIfNotNull(this.channelTransacted, instance: :setChannelTransacted).acceptIfNotNull(this.applicationContext, instance: :setApplicationContext).acceptIfNotNull(this.taskExecutor, instance: :setTaskExecutor).acceptIfNotNull(this.transactionManager, instance: :setTransactionManager).acceptIfNotNull(this.prefetchCount, instance: :setPrefetchCount).acceptIfNotNull(this.defaultRequeueRejected, instance: :setDefaultRequeueRejected).acceptIfNotNull(this.adviceChain, instance: :setAdviceChain).acceptIfNotNull(this.recoveryBackOff, instance: :setRecoveryBackOff).acceptIfNotNull(this.mismatchedQueuesFatal, instance: :setMismatchedQueuesFatal).acceptIfNotNull(this.missingQueuesFatal, instance: :setMissingQueuesFatal).acceptIfNotNull(this.consumerTagStrategy, instance: :setConsumerTagStrategy).acceptIfNotNull(this.idleEventInterval, instance: :setIdleEventInterval).acceptIfNotNull(this.failedDeclarationRetryInterval, instance: :setFailedDeclarationRetryInterval).acceptIfNotNull(this.applicationEventPublisher, instance: :setApplicationEventPublisher).acceptIfNotNull(this.autoStartup, instance: :setAutoStartup).acceptIfNotNull(this.phase, instance: :setPhase).acceptIfNotNull(this.afterReceivePostProcessors, instance: :setAfterReceivePostProcessors);    if (endpoint != null) {        if (endpoint.getAutoStartup() != null) {            instance.setAutoStartup(endpoint.getAutoStartup());        }        instance.setListenerId(endpoint.getId());        //绑定队列信息        endpoint.setupListenerContainer(instance);    }    if (instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {        AbstractAdaptableMessageListener messageListener = (AbstractAdaptableMessageListener) instance.getMessageListener();        javaUtils.acceptIfNotNull(this.beforeSendReplyPostProcessors, messageListener: :setBeforeSendReplyPostProcessors).acceptIfNotNull(this.retryTemplate, messageListener: :setRetryTemplate).acceptIfCondition(this.retryTemplate != null && this.recoveryCallback != null, this.recoveryCallback, messageListener: :setRecoveryCallback);    }    //个性化处理。    initializeContainer(instance, endpoint);    if (this.containerConfigurer != null) {        this.containerConfigurer.accept(instance);    }    return instance;}

方法上每一个@RabbitListener注解都会创建一个SimpleMessageListenerContainer容器,并放入到List集合中。

2.4 监听容器的启动

RabbitListenerEndpointRegistry对象的结构图:

RabbitListenerEndpointRegistry结构图.png

注意RabbitListenerEndpointRegistry接口实现了Lifecycle类,即Spring容器初始化完毕,会执行start()方法。

执行RabbitListenerEndpointRegistry#start()方法,实际上是遍历所有的监听容器对象,执行监听容器的start()方法开启监听。

@Override public void start() {    //bean创建完毕后,遍历存储ListenerContainer的集合,并且开启监听容器    for (MessageListenerContainer listenerContainer: getListenerContainers()) {        startIfNecessary(listenerContainer);    }}private void startIfNecessary(MessageListenerContainer listenerContainer) {    if (this.contextRefreshed || listenerContainer.isAutoStartup()) {        listenerContainer.start();    }}

监听容器的start()方法:

//对应源码:org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#start@Override public void start() {       ....    try {        logger.debug("Starting Rabbit listener container.");        configureAdminIfNeeded();        checkMismatchedQueues();        //子类实现,开启监听容器        doStart();    } catch(Exception ex) {        throw convertRabbitAccessException(ex);    }}

子类开启监听容器:

消费者线程一旦开启启动,那么便会一直去监听消息,并且去处理消息。

//org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart@Override protected void doStart() {    checkListenerContainerAware();    super.doStart();    synchronized(this.consumersMonitor) {        if (this.consumers != null) {            throw new IllegalStateException("A stopped container should not have consumers");        }      //根据配置的concurrentConsumers参数,创建消费者并存储到Set中        int newConsumers = initializeConsumers();         ...        Set 
processors = new HashSet
(); //根据配置的concurrentConsumers创建消费者线程 for (BlockingQueueConsumer consumer: this.consumers) { //创建消费者线程 AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); //使用线程池去执行消费者线程 getTaskExecutor().execute(processor); if (getApplicationEventPublisher() != null) { getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer)); } } //等待消费者线程执行成功 waitForConsumersToStart(processors); }}

2.5 总结

@RabbitListener生效的核心流程是:读取@RabbitListener配置,创建SimpleMessageListenerContainer对象。并且调用SimpleMessageListenerContainer对象的start()方法,创建消费者线程并且启动。

3. 为什么将SimpleMessageListenerContainer对象加入到Spring容器便可以监听队列?

AbstractMessageListenerContainer接口实现了Lifecycle接口,将其放入到Spring容器后,会执行生命周期的回调方法,即自动执行start()方法,开启队列监听。

SimpleMessageListenerContainer结构.png

转载地址:https://blog.csdn.net/qq_29595463/article/details/109815193 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:一样的httpclient,不一样的解读
下一篇:es(4)—查询条件match和term

发表评论

最新留言

关注你微信了!
[***.104.42.241]2024年04月19日 10时49分54秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

Flink在美团的应用与实践听课笔记 2019-04-27
Java多线程的11种创建方式以及纠正网上流传很久的一个谬误 2019-04-27
JDK源码研究Jstack,JMap,threaddump,dumpheap的原理 2019-04-27
Java使用字节码和汇编语言同步分析volatile,synchronized的底层实现 2019-04-27
javac编译原理和javac命令行的使用 2019-04-27
Unity使用UnityWebRequest实现本地日志上传到web服务器 2019-04-27
Unity使用RenderTexture实现裁切3D模型 2019-04-27
美术和程序吵架,原来是资源序列化格式设置不统一 2019-04-27
Unity iOS接SDK,定制UnityAppController 2019-04-27
Unity iOS接SDK前先要了解的知识(Objective-C) 2019-04-27
python遇到了‘module‘ object has no attribute ‘socket‘问题,大概率是这个原因 2019-04-27
记一次iOS闪退问题的定位:NSLog闪退 2019-04-27
Unity打开照相机与打开本地相册然后在Unity中显示照片(Android与iOS) 2019-04-27
无需接入SDK即可在Unity中获取经纬度(Android/iOS),告诉我你的坐标 2019-04-27
Unity获取系统信息SystemInfo(CPU、显卡、操作系统等信息) 2019-04-27
Unity中获取物体的尺寸(size)的三种方法 2019-04-27
Unity中的关节组件和绳子效果的实现 2019-04-27
Unity可视化编程插件: Bolt,可以像UE4的蓝图那样啦 2019-04-27
Android使用adb logcat时日志中文乱码问题,使用chcp 65001设置编码即可 2019-04-27
Android的.dex、.odex与.oat文件扫盲 2019-04-27