Введение в окружающую среду | |
---|---|
стек технологий | springboot+rocketmq |
программное обеспечение | Версия |
mysql | 8 |
IDEA | IntelliJ IDEA 2022.2.1 |
JDK | 17 |
Spring Boot | 3.1.7 |
rocketmq | 4.9.4 |
Модель сообщения Модель сообщения
RocketMQ в основном состоит из трех частей: Производитель, Брокер и Потребитель отвечает за создание сообщений, Потребитель отвечает за потребление сообщений, а Брокер отвечает за хранение сообщений. Брокер соответствует одному серверу во время фактического процесса развертывания. Каждый Брокер может хранить сообщения нескольких Тем, а также сообщения каждой Темы могут храниться в разных Брокерах во фрагментах. Очередь сообщений используется для хранения физического адреса сообщения. Адрес сообщения в каждой теме хранится в нескольких очередях сообщений. ConsumerGroup состоит из нескольких экземпляров Consumer.
Производитель сообщения Продюсер
Ответственный за создание сообщений. Как правило, за создание сообщений отвечает бизнес-система. Производитель сообщений будет отправлять сообщения, сгенерированные в системе бизнес-приложений, на сервер брокера. RocketMQ предоставляет несколько методов отправки, включая синхронную отправку, асинхронную отправку, последовательную отправку и одностороннюю отправку. И синхронные, и асинхронные методы требуют, чтобы брокер возвращал подтверждающую информацию, но односторонняя отправка этого не требует.
Потребитель сообщений
Отвечает за получение сообщений. Обычно за асинхронное потребление отвечает фоновая система. Потребитель сообщений извлекает сообщения с сервера брокера и передает их приложению. С точки зрения пользовательских приложений предусмотрены две формы потребления: потребление по запросу и потребление по запросу.
Тема
Представляет коллекцию сообщений определенного типа. Каждая тема содержит несколько сообщений. Каждое сообщение может принадлежать только одной теме. Это базовая единица RocketMQ для подписки на сообщения.
Брокерский сервер
Роль ретранслятора сообщений отвечает за хранение и пересылку сообщений. Прокси-сервер в системе RocketMQ отвечает за получение сообщений, отправленных от производителей, их хранение и подготовку запросов на включение от потребителей. Прокси-сервер также хранит метаданные, связанные с сообщениями, включая группы потребителей, смещения прогресса потребления, а также сообщения тем и очередей.
Сервер имен Сервер имен
Служба имен действует как поставщик маршрутизируемых сообщений. Производители или потребители могут найти список IP-адресов брокера, соответствующий каждой теме, через службу имен. Несколько экземпляров Namesrv образуют кластер, но они независимы друг от друга и не обмениваются информацией.
Pom.xml добавляет зависимости
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>
конфигурация application.yml
rocketmq:
name-server: 192.168.68.133:9876
producer:
#продюсер Название группы должно быть уникальным в пределах приложения
group: test-producer
#Тайм-аут отправки сообщения По умолчанию 3000 мс
send-message-timeout: 3000
#Когда сообщение достигнет размера 4096 байт, оно будет сжато. по умолчанию 4096
compress-message-body-threshold: 4096
#Максимальный лимит сообщений, по умолчанию — 128 КБ
max-message-size: 4194304
#Количество попыток отправки сообщения о неудачной синхронизации
retry-times-when-send-failed: 2
#Стоит ли повторять попытки использования других брокеров в случае сбоя внутренней отправки. Этот параметр вступает в силу только при наличии нескольких брокеров.
retry-next-server: true
#Количество попыток неудачной отправки асинхронного сообщения
retry-times-when-send-async-failed: 2
Если сообщается об ошибке, это означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно поставлено в очередь в соответствии с конфигурацией application.yml.
@Component @RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot") public class RocketListener implements RocketMQListener<MessageExt> { /** * onMessage потребительский метод * @param messages Содержание сообщения */ @Override public void onMessage(MessageExt messages) { /Если вы не сообщаете об ошибке, вы подписываетесь для получения информации. //Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. System.out.println("полученныйинформация:"+new String(messages.getBody()));
} }
продюсер
@Test
void sendMsg() {
/**
* Отправить сообщение синхронизации
* destination тема назначения
* payload информация
*/
rocketMQTemplate.syncSend("TopicTest", «синхронная информация»);
}
потребитель
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {
/**
* onMessage потребительский метод
* @param messages Содержание сообщения
*/
@Override
public void onMessage(MessageExt messages) {
//Если вы не сообщаете об ошибке, вы подписываетесь для получения информации.
//Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. (согласно конфигурации yml)
System.out.println("Получено сообщение: "+new String(messages.getBody()));
}
}
продюсер
@Test
void asyncTest() {
/**
* Отправлять асинхронные сообщения
* destination тема назначения
* payload информация
*/
rocketMQTemplate.asyncSend("TopicTest", "асинхронныйинформация", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Отправлено успешно");
}
@Override
public void onException(Throwable throwable) {
System.out.println("Отправка не удалась");
}
});
}
потребитель
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {
/**
* onMessage потребительский метод
* @param messages Содержание сообщения
*/
@Override
public void onMessage(MessageExt messages) {
//Если вы не сообщаете об ошибке, вы подписываетесь для получения информации.
//Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. (согласно конфигурации yml)
System.out.println("Получено сообщение: "+new String(messages.getBody()));
}
}
продюсер
@Test
void delayTest() {
/**
* Отправить отложенное сообщение
* destination тема назначения
* payload информация
* timestamp Тайм-аут соединения
* delayLevel Уровень задержки
*/
Message<String> msg = MessageBuilder.withPayload("задерживатьинформация").build();
rocketMQTemplate.syncSend("TopicTest", msg, 3000, 3);
}
потребитель
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {
/**
* onMessage потребительский метод
* @param messages Содержание сообщения
*/
@Override
public void onMessage(MessageExt messages) {
//Если вы не сообщаете об ошибке, вы подписываетесь для получения информации.
//Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. (согласно конфигурации yml)
System.out.println("Получено сообщение: "+new String(messages.getBody()));
}
}
продюсер
@Test
void OneWayTest() {
/**
* Отправить одностороннее сообщение
* destination тема назначения
* payload информация
*/
rocketMQTemplate.sendOneWay("TopicTest", «односторонняя информация»);
}
потребитель
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {
/**
* onMessage потребительский метод
* @param messages Содержание сообщения
*/
@Override
public void onMessage(MessageExt messages) {
//Если вы не сообщаете об ошибке, вы подписываетесь для получения информации.
//Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. (согласно конфигурации yml)
System.out.println("Получено сообщение: "+new String(messages.getBody()));
}
}
последовательное сообщение продюсеру необходимо отправить группу информации в одну очередь , потребитель требует однопоточного потребления
продюсер
продюсеру необходимо отправить группу информации в одну очередь
List<MessageM> messageMs = Arrays.asList(
new MessageM("sn0001", 1, «Оформить заказ»),
new MessageM("sn0001", 1, "Оплата"),
new MessageM("sn0001", 1, "Доставка"),
new MessageM("sn0002", 2, «Оформить заказ»),
new MessageM("sn0002", 2, "Оплата"),
new MessageM("sn0002", 2, "Доставка")
);
@Test
void orderlyTest() {
/**
* destination тема назначения
* payload информация
*/
for (MessageM messageM : messageMs) {
rocketMQTemplate.syncSendOrderly("orderlyTest", JSON.toJSON(messageM), messageM.getSn());
}
}
потребитель
ОДНОВРЕМЕННО одновременно
ПОРЯДОЧНО
потребитель требует потребления одного потока
@Component
@RocketMQMessageListener(topic = "orderlyTest",consumerGroup = "orderly",consumeMode = ConsumeMode.ORDERLY)
public class orderlyListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
MessageM messageM = JSON.parseObject(new String(messageExt.getBody()), MessageM.class);
System.out.println(messageM);
}
}
продюсер
@Test
void ProducerTagTest(){
rocketMQTemplate.syncSend("TagMQ:tagA","Информация tagA");
rocketMQTemplate.syncSend("TagMQ:tagB","приноситьtagBизинформация");
}
потребитель
@Component
@RocketMQMessageListener(topic = "TagMQ",
consumerGroup = "TagMQGroup",
selectorType = SelectorType.TAG, //режим фильтра тегов
selectorExpression = "tagA || tagB"
)
public class MsgListenerTag implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
System.out.println(new String( messageExt.getBody()));
}
}
Ключ передается в заголовке сообщения.
продюсер
@Test
void keyTest(){
String Key = UUID.randomUUID().toString();
Message<String> msg = MessageBuilder
.withPayload("приноситьkeyинформация").
setHeader(RocketMQHeaders.KEYS, Key)
.build();
/**
* приноситьKeyинформация
*/
rocketMQTemplate.syncSend("ketTopic",msg);
}
потребитель
@Component
@RocketMQMessageListener(topic = "ketTopic",consumerGroup = "ketConsumerGroup-springboot")
public class keyMQListener implements RocketMQListener<MessageExt> {
/**
* onMessage потребительский метод
* @param messages Содержание сообщения
*/
@Override
public void onMessage(MessageExt messages) {
//Если вы не сообщаете об ошибке, вы подписываетесь для получения информации.
//Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. (согласно конфигурации yml)
System.out.println("Получено сообщение: "+new String(messages.getBody()));
System.out.println("key:"+messages.getKeys());
}
}