ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

源码解析: Spring RabbitMQ消费者

2022-02-20 15:30:49  阅读:137  来源: 互联网

标签:beanName mbd Spring Object RabbitMQ bean 源码 BlockingQueueConsumer new


从Spring RabbitMQ消费者启动,到接收消息和执行消费逻辑,一步步了解其实现。

目录

1. 消费者如何启动过程

1.1 启动配置类

创建RabbitListenerAnnotationBeanPostProcessor

@Configuration
public class RabbitBootstrapConfiguration {

	@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
		return new RabbitListenerAnnotationBeanPostProcessor();
	}
.....
}

1.2 创建消费者核心逻辑

核心逻辑在RabbitListenerAnnotationBeanPostProcessor,在Spring Bean初始化过程中执行。
对于每个消息监听都会创建对应的MessageListenerContainer(默认实现为SimpleMessageListenerContainer)

// 通过BeanPostProcessor在Bean创建后,创建消息监听器
public class RabbitListenerAnnotationBeanPostProcessor
		implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
			SmartInitializingSingleton {
  ......
      @Override
      public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
      Class<?> targetClass = AopUtils.getTargetClass(bean);
      // 通过反射获取@RabbitListener修饰的方法
      final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
      for (ListenerMethod lm : metadata.listenerMethods) {
          for (RabbitListener rabbitListener : lm.annotations) {
              // 创建MethodRabbitListenerEndpoint,并注册到RabbitListenerEndpointRegistrar
              processAmqpListener(rabbitListener, lm.method, bean, beanName);
          }
      }
      if (metadata.handlerMethods.length > 0) {
          processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
      }
      return bean;
  }

	protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
		Method methodToUse = checkProxy(method, bean);
		MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
		endpoint.setMethod(methodToUse);
		processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
	}
	// 创建RabbitMQ消费者核心逻辑
	protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
			Object adminTarget, String beanName) {
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(rabbitListener));
		// resolveQueues方法会处理创建队列的工作
		endpoint.setQueueNames(resolveQueues(rabbitListener));
		.......
	// registerEndpoint()里核心创建MessageListenerContainer,其默认实现是SimpleMessageListenerContainer
	this.registrar.registerEndpoint(endpoint, factory);
}
 ......
}

1.3 PS: BeanPostPorcessor如何被Spring处理?

虽然大家都很熟悉Spring Bean初始化流程里,但唠叨一下
调用链路:getBean -> doGetBean -> createBean -> initializeBean
->applyBeanPostProcessorsBeforeInitialization -> applyBeanPostProcessorsAfterInitialization

public abstract class AbstractAutowireCapableBeanFactory extends AbstractBeanFactory
		implements AutowireCapableBeanFactory{
	// 创建一个Bean实例对象,应用post-processors
  protected Object createBean(String beanName, RootBeanDefinition mbd, Object[] args) throws BeanCreationException {
        // 各种准备工作
        ......
       // 最后调用doCreateBean
      Object beanInstance = doCreateBean(beanName, mbdToUse, args);
      if (logger.isDebugEnabled()) {
        logger.debug("Finished creating instance of bean '" + beanName + "'");
      }
      return beanInstance;
  }    

	protected Object doCreateBean(final String beanName, final RootBeanDefinition mbd, final Object[] args)
			throws BeanCreationException {
      ......
    // Initialize the bean instance.
		Object exposedObject = bean;
		try {
			populateBean(beanName, mbd, instanceWrapper);
			if (exposedObject != null) {
        // 调用initializeBean
				exposedObject = initializeBean(beanName, exposedObject, mbd);
			}
		}
		catch (Throwable ex) {
      .....
		}
 }
 
     // 初始化Bean实例
	protected Object initializeBean(final String beanName, final Object bean, RootBeanDefinition mbd) {
      ......
      if (mbd == null || !mbd.isSynthetic()) {
        wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
      }
      try {
        invokeInitMethods(beanName, wrappedBean, mbd);
      }
      catch (Throwable ex) {
        throw new BeanCreationException(
            (mbd != null ? mbd.getResourceDescription() : null),
            beanName, "Invocation of init method failed", ex);
      }
      if (mbd == null || !mbd.isSynthetic()) {
        wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
      } 
      return wrappedBean;
  	} 
}

2. RabbitMQ消息如何被消费

2.1 SimpleMessageListenerContainer

上面说了消费者启动会创建SimpleMessageListenerContainer,它启动时会创建一个AsyncMessageProcessingConsumer内部类的对象(实现了Runnable接口,核心属性是BlockingQueueConsumer),AsyncMessageProcessingConsumer的run()通过while循环不断接收消息并调用我们使用@RabbitListener修饰的方法实现的消费逻辑。

	@Override
	protected void doStart() throws Exception {
		......
		super.doStart();
		synchronized (this.consumersMonitor) {
			if (this.consumers != null) {
				throw new IllegalStateException("A stopped container should not have consumers");
			}
			// 根据配置的并发数创建对应数量BlockingQueueConsumer 
			int newConsumers = initializeConsumers();
		......
			Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
			for (BlockingQueueConsumer consumer : this.consumers) {
				AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
				processors.add(processor);
				// 执行AsyncMessageProcessingConsumer,轮询调用获取队列里的消息并执行消费逻辑
				getTaskExecutor().execute(processor);
				if (getApplicationEventPublisher() != null) {
					getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
				}
			}
			for (AsyncMessageProcessingConsumer processor : processors) {
				FatalListenerStartupException startupException = processor.getStartupException();
				if (startupException != null) {
					throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
				}
			}
		}
	}

2.2 BlockingQueueConsumer

BlockingQueueConsumer扮演一个解耦消息接收和消息消费的角色,一方面负责承接Channel接收的消息并压入BlockingQueue queue,另一方面被AsyncMessageProcessingConsumer轮询调用获取队列里的消息并执行消费逻辑。

	// 从队列中获取消息
	public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
		......
		Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
		if (message == null && this.cancelled.get()) {
			throw new ConsumerCancelledException();
		}
		return message;
	}

		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
				throws IOException {
			......
			try {
				// 如果BlockingQueueConsumer已被标记为停止,调用offer将消息入队,如果队列满了会马上返回false
				if (BlockingQueueConsumer.this.abortStarted > 0) {
					//如果offer失败,发送basic.nack命令通知服务端消息没有消费成功,然后发送basic.cancel命令通知服务端取消订阅,服务端不再发送消息到该消费者
					if (!BlockingQueueConsumer.this.queue.offer(
							new Delivery(consumerTag, envelope, properties, body, this.queue),
							BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {

						RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
						// Defensive - should never happen
						BlockingQueueConsumer.this.queue.clear();
						getChannel().basicNack(envelope.getDeliveryTag(), true, true);
						getChannel().basicCancel(consumerTag);
						try {
							getChannel().close();
						}
						catch (TimeoutException e) {
							// no-op
						}
					}
				}
				else {
				// 如果BlockingQueueConsumer没有标记为停止,调用put入队,如果队列空间满了则会一直等待直到空间可用
					BlockingQueueConsumer.this.queue
							.put(new Delivery(consumerTag, envelope, properties, body, this.queue));
				}
			}
			catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
		}

标签:beanName,mbd,Spring,Object,RabbitMQ,bean,源码,BlockingQueueConsumer,new
来源: https://blog.csdn.net/AnIllusion/article/details/123031367

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有