В этой статье в основном изучается механизм реализации RocketMQMessageListener.
@Service
@RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer")
public class UserConsumer implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
System.out.printf("######## user_consumer received: %s ; age: %s ; name: %s \n", message, message.getUserAge(), message.getUserName());
}
}
Класс, реализующий интерфейс RocketMQListener и в сочетании с аннотацией @RocketMQMessageListener, может использовать сообщения rocketmq.
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQListener.java
public interface RocketMQListener<T> {
void onMessage(T message);
}
Интерфейс RocketMQListener определяет метод onMessage.
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
/**
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
* load balance. It's required and needs to be globally unique.
*
*
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
*/
String consumerGroup();
/**
* Topic name.
*/
String topic();
/**
* Control how to selector message.
*
* @see SelectorType
*/
SelectorType selectorType() default SelectorType.TAG;
/**
* Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
*/
String selectorExpression() default "*";
/**
* Control consume mode, you can choice receive message concurrently or orderly.
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
/**
* Max consumer thread number.
* This property control consumer thread pool executor maximumPoolSize see
* {@link ConsumeMessageService#updateCorePoolSize(int)}
* @see <a href="https://github.com/apache/rocketmq-spring/issues/546">issues#546</a>
*/
int consumeThreadMax() default 64;
/**
* consumer thread number.
*/
int consumeThreadNumber() default 20;
/**
* Max re-consume times.
*
* In concurrently mode, -1 means 16;
* In orderly mode, -1 means Integer.MAX_VALUE.
*/
int maxReconsumeTimes() default -1;
/**
* Maximum amount of time in minutes a message may block the consuming thread.
*/
long consumeTimeout() default 15L;
/**
* Timeout for sending reply messages.
*/
int replyTimeout() default 3000;
/**
* The property of "access-key".
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;
/**
* The property of "secret-key".
*/
String secretKey() default SECRET_KEY_PLACEHOLDER;
/**
* Switch flag instance for message trace.
*/
boolean enableMsgTrace() default false;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
/**
* The property of "name-server".
*/
String nameServer() default NAME_SERVER_PLACEHOLDER;
/**
* The property of "access-channel".
*/
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
/**
* The property of "tlsEnable" default false.
*/
String tlsEnable() default "false";
/**
* The namespace of consumer.
*/
String namespace() default "";
/**
* Message consume retry strategy in concurrently mode.
*
* -1,no retry,put into DLQ directly
* 0,broker control retry frequency
* >0,client control retry frequency
*/
int delayLevelWhenNextConsume() default 0;
/**
* The interval of suspending the pull in orderly mode, in milliseconds.
*
* The minimum value is 10 and the maximum is 30000.
*/
int suspendCurrentQueueTimeMillis() default 1000;
/**
* Maximum time to await message consuming when shutdown consumer, in milliseconds.
* The minimum value is 0
*/
int awaitTerminationMillisWhenShutdown() default 1000;
/**
* The property of "instanceName".
*/
String instanceName() default "DEFAULT";
}
RocketMQMessageListener — это группа ConsumerGroup、topic、selectorType、selectorExpression、consumeMode、messag. eModel、consumeThreadMax、consumeThreadNumber、maxReconsumeTimes、consumeTimeout、replyTimeout、accessKey 、secretKey、enableMsgTrace、customizedTraceTopic、nameServer、accessChannel、tlsEnable、namespace、delayLevelWhenNextConsume、suspendCurrentQueueTimeMillis、awaitTerminationMillisWhenShutdown、Атрибут имя_экземпляра
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java
public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean {
private ApplicationContext applicationContext;
private AnnotationEnhancer enhancer;
private ListenerContainerConfiguration listenerContainerConfiguration;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class);
if (ann != null) {
RocketMQMessageListener enhance = enhance(targetClass, ann);
if (listenerContainerConfiguration != null) {
listenerContainerConfiguration.registerContainer(beanName, bean, enhance);
}
}
return bean;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void afterPropertiesSet() throws Exception {
buildEnhancer();
this.listenerContainerConfiguration = this.applicationContext.getBean(ListenerContainerConfiguration.class);
}
private void buildEnhancer() {
if (this.applicationContext != null) {
Map<String, AnnotationEnhancer> enhancersMap =
this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
if (enhancersMap.size() > 0) {
List<AnnotationEnhancer> enhancers = enhancersMap.values()
.stream()
.sorted(new OrderComparator())
.collect(Collectors.toList());
this.enhancer = (attrs, element) -> {
Map<String, Object> newAttrs = attrs;
for (AnnotationEnhancer enh : enhancers) {
newAttrs = enh.apply(newAttrs, element);
}
return attrs;
};
}
}
}
private RocketMQMessageListener enhance(AnnotatedElement element, RocketMQMessageListener ann) {
if (this.enhancer == null) {
return ann;
} else {
return AnnotationUtils.synthesizeAnnotation(
this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), RocketMQMessageListener.class, null);
}
}
public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>> {
}
}
RocketMQMessageListenerBeanPostProcessor реализует интерфейсы ApplicationContextAware, BeanPostProcessor и InitializingBean. Метод postProcessAfterInitialization определяет, имеет ли компонент аннотации RocketMQMessageListener. Если да, он будет улучшен посредством Enhance. Кроме того, он будет зарегистрирован через RegisterContainer в ListenerContainerConfiguration. Его метод afterPropertiesSet выполнит метод buildEnhancer.,Этот метод получит экземпляр bean-компонента AnnotationEnhancer.,Тогда организуйтепоследовательностьхороший,Наконец создайте AnnotationEnhancer,Его функция заключается в применении этих усилителей один за другим.
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
public void registerContainer(String beanName, Object bean, RocketMQMessageListener annotation) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
}
if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
}
String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
String topic = this.environment.resolvePlaceholders(annotation.topic());
boolean listenerEnabled =
(boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
log.debug(
"Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
consumerGroup, topic);
return;
}
validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
Метод RegisterContainer в ListenerContainerConfiguration создаст DefaultRocketMQListenerContainer на основе информации аннотации и соответствующего bean-компонента, зарегистрирует его в genericApplicationContext и одновременно выполнит его метод запуска.
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultRocketMQListenerContainer implements InitializingBean,
RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
private DefaultMQPushConsumer consumer;
private RocketMQListener rocketMQListener;
@Override
public void start() {
if (this.isRunning()) {
throw new IllegalStateException("container already running. " + this.toString());
}
try {
consumer.start();
} catch (MQClientException e) {
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
this.setRunning(true);
log.info("running container: {}", this.toString());
}
@Override
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: {}", messageType);
}
private void initRocketMQPushConsumer() throws MQClientException {
if (rocketMQListener == null && rocketMQReplyListener == null) {
throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
}
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required");
RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
if (Objects.nonNull(rpcHook)) {
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
} else {
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
}
consumer.setNamespace(namespace);
String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
if (customizedNameServer != null) {
consumer.setNamesrvAddr(customizedNameServer);
} else {
consumer.setNamesrvAddr(nameServer);
}
if (accessChannel != null) {
consumer.setAccessChannel(accessChannel);
}
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeThreadMin(consumeThreadNumber);
consumer.setConsumeTimeout(consumeTimeout);
consumer.setMaxReconsumeTimes(maxReconsumeTimes);
consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
consumer.setInstanceName(instanceName);
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
//if String is not is equal "true" TLS mode will represent the as default value false
consumer.setUseTLS(new Boolean(tlsEnable));
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}
}
//......
}
DefaultRocketMQListenerContainerизstartметод(
SmartLifecycle
)осуществлятьconsumerизstartметод;ЧтоafterPropertiesSetметод(InitializingBean
)встречаосуществлятьinitRocketMQPushConsumerметодсоздатьconsumer initRocketMQPushConsumerметод В основном создаватьDefaultMQPushConsumer,Установить модель сообщения,Выполнить метод подписки в соответствии с selectorType,в соответствии сconsumeModeустановитьmessageListener(DefaultMessageListenerOrderly — DefaultMessageListenerConcurrently
),Наконец, выполните метод подготовитьStart для RocketMQPushConsumerLifecycleListener.
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
@SuppressWarnings("unchecked")
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
DefaultMessageListenerOrderly реализует интерфейс ConsumerMessage MessageListenerOrderly. Он внутренне просматривает сообщения и выполняет handleMessage одно за другим. Если есть исключение, он устанавливает context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis), а затем возвращает ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT.
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
DefaultMessageListenerConcurrently реализует метод ConsumerMessage интерфейса MessageListenerConcurrently. Он внутренне просматривает сообщения и выполняет handleMessage одно за другим. Если есть исключение, установите context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume), а затем верните ConsumeConcurrentlyStatus.RECONSUME_LATER.
private void handleMessage(
MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
if (rocketMQListener != null) {
rocketMQListener.onMessage(doConvertMessage(messageExt));
} else if (rocketMQReplyListener != null) {
Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
Message<?> message = MessageBuilder.withPayload(replyContent).build();
org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
producer.setSendMsgTimeout(replyTimeout);
producer.send(replyMessage, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
} else {
log.debug("Consumer replies message success.");
}
}
@Override public void onException(Throwable e) {
log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
}
});
}
}
Метод handleMessage делегируется методу rocketMQListener.onMessage(doConvertMessage(messageExt)), который вызывает обратно настроенный для бизнеса RocketMQListener.
org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
/**
* This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
*
* @throws MQClientException if there is any client error.
*/
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
Метод start DefaultMQPushConsumer сначала выполняет setConsumerGroup, а затем делегирует его defaultMQPushConsumerImpl.start(). Если существует трассировочный диспетчер, выполняется метод трассировкиDispatcher.start. Метод defaultMQPushConsumerImpl.start() запускает MQClientInstance.start(), который запускает pullMessageService.start(). и rebalanceService.start()(
DefaultMQPushConsumerImpl.executePullRequestImmediately будет запущен во время doRebalance и поместит pullRequest в pullRequestQueue.
),Первый получит pullRequest из pullRequestQueue, а затем выполнит метод DefaultMQPushConsumerImpl.pullMessage.,Внутри выполняется pullAPIWrapper.pullKernelImpl.,Затем передайте pullCallback вprocessQueue.putMessage.,Снова запустите ConsumerMessageService.submitConsumeRequest.,Он выполнит обратный вызов Listener.consumeMessage для использования сообщения.
RocketMQMessageListenerBeanPostProcessor реализует интерфейсы ApplicationContextAware, BeanPostProcessor и InitializingBean. Метод postProcessAfterInitialization определяет, имеет ли компонент аннотации RocketMQMessageListener. Если да, он будет улучшен посредством Enhance. Кроме того, он будет зарегистрирован через RegisterContainer в ListenerContainerConfiguration.
Метод RegisterContainer в ListenerContainerConfiguration создаст DefaultRocketMQListenerContainer на основе информации аннотации и соответствующего bean-компонента, зарегистрирует его в genericApplicationContext и одновременно выполнит его метод запуска. DefaultRocketMQListenerContainerизstartметод(
SmartLifecycle
)осуществлятьconsumerизstartметод;ЧтоafterPropertiesSetметод(InitializingBean
)встречаосуществлятьinitRocketMQPushConsumerметодсоздатьconsumer Метод start в основном выполняет pullMessageService.start() и rebalanceService.start(). Первый отвечает за получение pullRequest из pullRequestQueue, а затем загрузку сообщения в ProcessQueue, а затем запуск обратного вызова Listener.consumeMessage для использования сообщения; последний отвечает за ребалансировку и будет срабатывать в начале defaultMQPushConsumerImpl.executePullRequestImmediately, то есть поместить pullRequest в pullRequestQueue. pushConsumer по существу основан на режиме извлечения, полученном из RocketMQMessageListenerBeanPostProcessor. --> DefaultRocketMQListenerContainer.start --> DefaultMQPushConsumer.start --> defaultMQPushConsumerImpl.start() --> pullMessageService.start() и rebalanceService.start()