Рука об руку с Springboot3 для интеграции RocketMQ2.3
Рука об руку с Springboot3 для интеграции RocketMQ2.3

Введение в окружающую среду

стек технологий

springboot+rocketmq

программное обеспечение

Версия

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

rocketmq

4.9.4

Основные понятия RocketMQ

Модель сообщения Модель сообщения

RocketMQ в основном состоит из трех частей: Производитель, Брокер и Потребитель отвечает за создание сообщений, Потребитель отвечает за потребление сообщений, а Брокер отвечает за хранение сообщений. Брокер соответствует одному серверу во время фактического процесса развертывания. Каждый Брокер может хранить сообщения нескольких Тем, а также сообщения каждой Темы могут храниться в разных Брокерах во фрагментах. Очередь сообщений используется для хранения физического адреса сообщения. Адрес сообщения в каждой теме хранится в нескольких очередях сообщений. ConsumerGroup состоит из нескольких экземпляров Consumer.

Производитель сообщения Продюсер

Ответственный за создание сообщений. Как правило, за создание сообщений отвечает бизнес-система. Производитель сообщений будет отправлять сообщения, сгенерированные в системе бизнес-приложений, на сервер брокера. RocketMQ предоставляет несколько методов отправки, включая синхронную отправку, асинхронную отправку, последовательную отправку и одностороннюю отправку. И синхронные, и асинхронные методы требуют, чтобы брокер возвращал подтверждающую информацию, но односторонняя отправка этого не требует.

Потребитель сообщений

Отвечает за получение сообщений. Обычно за асинхронное потребление отвечает фоновая система. Потребитель сообщений извлекает сообщения с сервера брокера и передает их приложению. С точки зрения пользовательских приложений предусмотрены две формы потребления: потребление по запросу и потребление по запросу.

Тема

Представляет коллекцию сообщений определенного типа. Каждая тема содержит несколько сообщений. Каждое сообщение может принадлежать только одной теме. Это базовая единица RocketMQ для подписки на сообщения.

Брокерский сервер

Роль ретранслятора сообщений отвечает за хранение и пересылку сообщений. Прокси-сервер в системе RocketMQ отвечает за получение сообщений, отправленных от производителей, их хранение и подготовку запросов на включение от потребителей. Прокси-сервер также хранит метаданные, связанные с сообщениями, включая группы потребителей, смещения прогресса потребления, а также сообщения тем и очередей.

Сервер имен Сервер имен

Служба имен действует как поставщик маршрутизируемых сообщений. Производители или потребители могут найти список IP-адресов брокера, соответствующий каждой теме, через службу имен. Несколько экземпляров Namesrv образуют кластер, но они независимы друг от друга и не обмениваются информацией.

Pom.xml добавляет зависимости

Язык кода:java
копировать
<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-spring-boot-starter</artifactId>
 <version>2.3.0</version>
</dependency>
<dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>2.0.32</version>
</dependency>

конфигурация application.yml

Язык кода:java
копировать
rocketmq:
 name-server: 192.168.68.133:9876
 producer:
 #продюсер Название группы должно быть уникальным в пределах приложения
 group: test-producer
 #Тайм-аут отправки сообщения По умолчанию 3000 мс
 send-message-timeout: 3000
 #Когда сообщение достигнет размера 4096 байт, оно будет сжато. по умолчанию 4096
 compress-message-body-threshold: 4096
 #Максимальный лимит сообщений, по умолчанию — 128 КБ
 max-message-size: 4194304
 #Количество попыток отправки сообщения о неудачной синхронизации
 retry-times-when-send-failed: 2
 #Стоит ли повторять попытки использования других брокеров в случае сбоя внутренней отправки. Этот параметр вступает в силу только при наличии нескольких брокеров.
 retry-next-server: true
 #Количество попыток неудачной отправки асинхронного сообщения
 retry-times-when-send-async-failed: 2

потребительский слушатель

Если сообщается об ошибке, это означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно поставлено в очередь в соответствии с конфигурацией application.yml.

@Component @RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot") public class RocketListener implements RocketMQListener<MessageExt> { /** * onMessage потребительский метод * @param messages Содержание сообщения */ @Override public void onMessage(MessageExt messages) { /Если вы не сообщаете об ошибке, вы подписываетесь для получения информации. //Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. System.out.println("полученныйинформация:"+new String(messages.getBody()));

} }

Отправить сообщение синхронизации

продюсер

Язык кода:java
копировать
@Test
void sendMsg() {
 /**
 * Отправить сообщение синхронизации
 * destination тема назначения
 * payload информация
 */
 rocketMQTemplate.syncSend("TopicTest", «синхронная информация»);
}

потребитель

Язык кода:java
копировать
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

 /**
 * onMessage потребительский метод
 * @param messages Содержание сообщения
 */
 @Override
 public void onMessage(MessageExt messages) {
 //Если вы не сообщаете об ошибке, вы подписываетесь для получения информации.
 //Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. (согласно конфигурации yml)
 System.out.println("Получено сообщение: "+new String(messages.getBody()));
 }
}

Отправлять асинхронные сообщения

продюсер

Язык кода:java
копировать
@Test
void asyncTest() {
 /**
 * Отправлять асинхронные сообщения
 * destination тема назначения
 * payload информация
 */
 rocketMQTemplate.asyncSend("TopicTest", "асинхронныйинформация", new SendCallback() {
 @Override
 public void onSuccess(SendResult sendResult) {
 System.out.println("Отправлено успешно");
 }

 @Override
 public void onException(Throwable throwable) {
 System.out.println("Отправка не удалась");
 }
 });
}

потребитель

Язык кода:java
копировать
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

 /**
 * onMessage потребительский метод
 * @param messages Содержание сообщения
 */
 @Override
 public void onMessage(MessageExt messages) {
 //Если вы не сообщаете об ошибке, вы подписываетесь для получения информации.
 //Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. (согласно конфигурации yml)
 System.out.println("Получено сообщение: "+new String(messages.getBody()));
 }
}

Отправить отложенное сообщение

продюсер

Язык кода:java
копировать
@Test
void delayTest() {
 /**
 * Отправить отложенное сообщение
 * destination тема назначения
 * payload информация
 * timestamp Тайм-аут соединения
 * delayLevel Уровень задержки
 */
 Message<String> msg = MessageBuilder.withPayload("задерживатьинформация").build();
 rocketMQTemplate.syncSend("TopicTest", msg, 3000, 3);
}

потребитель

Язык кода:java
копировать
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

 /**
 * onMessage потребительский метод
 * @param messages Содержание сообщения
 */
 @Override
 public void onMessage(MessageExt messages) {
 //Если вы не сообщаете об ошибке, вы подписываетесь для получения информации.
 //Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. (согласно конфигурации yml)
 System.out.println("Получено сообщение: "+new String(messages.getBody()));
 }
}

Отправить одностороннее сообщение

продюсер

Язык кода:java
копировать
@Test
void OneWayTest() {
 /**
 * Отправить одностороннее сообщение
 * destination тема назначения
 * payload информация
 */
 rocketMQTemplate.sendOneWay("TopicTest", «односторонняя информация»);
}

потребитель

Язык кода:java
копировать
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

 /**
 * onMessage потребительский метод
 * @param messages Содержание сообщения
 */
 @Override
 public void onMessage(MessageExt messages) {
 //Если вы не сообщаете об ошибке, вы подписываетесь для получения информации.
 //Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. (согласно конфигурации yml)
 System.out.println("Получено сообщение: "+new String(messages.getBody()));
 }
}

Отправляйте последовательные сообщения

последовательное сообщение продюсеру необходимо отправить группу информации в одну очередь , потребитель требует однопоточного потребления

продюсер

продюсеру необходимо отправить группу информации в одну очередь

Язык кода:java
копировать
List<MessageM> messageMs = Arrays.asList(
 new MessageM("sn0001", 1, «Оформить заказ»),
 new MessageM("sn0001", 1, "Оплата"),
 new MessageM("sn0001", 1, "Доставка"),
 new MessageM("sn0002", 2, «Оформить заказ»),
 new MessageM("sn0002", 2, "Оплата"),
 new MessageM("sn0002", 2, "Доставка")
);


@Test
void orderlyTest() {
 /**

 * destination тема назначения
 * payload информация




 */
 for (MessageM messageM : messageMs) {
 rocketMQTemplate.syncSendOrderly("orderlyTest", JSON.toJSON(messageM), messageM.getSn());
 }
}

потребитель

ОДНОВРЕМЕННО одновременно

ПОРЯДОЧНО

потребитель требует потребления одного потока

Язык кода:java
копировать
@Component
@RocketMQMessageListener(topic = "orderlyTest",consumerGroup = "orderly",consumeMode = ConsumeMode.ORDERLY)
public class orderlyListener implements RocketMQListener<MessageExt> {

 @Override
 public void onMessage(MessageExt messageExt) {
 MessageM messageM = JSON.parseObject(new String(messageExt.getBody()), MessageM.class);
 System.out.println(messageM);
 }
}

Отправить с тегом

продюсер

Язык кода:java
копировать
@Test
void ProducerTagTest(){
 rocketMQTemplate.syncSend("TagMQ:tagA","Информация tagA");
 rocketMQTemplate.syncSend("TagMQ:tagB","приноситьtagBизинформация");
}

потребитель

Язык кода:java
копировать
@Component
@RocketMQMessageListener(topic = "TagMQ",
 consumerGroup = "TagMQGroup",
 selectorType = SelectorType.TAG, //режим фильтра тегов
 selectorExpression = "tagA || tagB"

)
public class MsgListenerTag implements RocketMQListener<MessageExt> {

 @Override
 public void onMessage(MessageExt messageExt) {
 System.out.println(new String( messageExt.getBody()));
 }
}

Отправить сообщение с ключом

Ключ передается в заголовке сообщения.

продюсер

Язык кода:java
копировать
@Test
void keyTest(){
 String Key = UUID.randomUUID().toString();
 Message<String> msg = MessageBuilder
 .withPayload("приноситьkeyинформация").
 setHeader(RocketMQHeaders.KEYS, Key)
 .build();
 /**
 * приноситьKeyинформация
 */
 rocketMQTemplate.syncSend("ketTopic",msg);
}

потребитель

Язык кода:java
копировать
@Component
@RocketMQMessageListener(topic = "ketTopic",consumerGroup = "ketConsumerGroup-springboot")
public class keyMQListener implements RocketMQListener<MessageExt> {

 /**
 * onMessage потребительский метод
 * @param messages Содержание сообщения
 */
 @Override
 public void onMessage(MessageExt messages) {
 //Если вы не сообщаете об ошибке, вы подписываетесь для получения информации.
 //Сообщение об ошибке означает повторную попытку. Если сообщение об ошибке все еще сообщается после трех повторных попыток, это означает, что потребление не удалось и оно будет повторно введено в очередь. (согласно конфигурации yml)
 System.out.println("Получено сообщение: "+new String(messages.getBody()));
 System.out.println("key:"+messages.getKeys());
 }
}

Я участвую в последнем конкурсе эссе для специального учебного лагеря Tencent Technology Creation 2024. Приходите и разделите со мной приз!

boy illustration
Углубленный анализ переполнения памяти CUDA: OutOfMemoryError: CUDA не хватает памяти. Попыталась выделить 3,21 Ги Б (GPU 0; всего 8,00 Ги Б).
boy illustration
[Решено] ошибка установки conda. Среда решения: не удалось выполнить первоначальное зависание. Повторная попытка с помощью файла (графическое руководство).
boy illustration
Прочитайте нейросетевую модель Трансформера в одной статье
boy illustration
.ART Теплые зимние предложения уже открыты
boy illustration
Сравнительная таблица описания кодов ошибок Amap
boy illustration
Уведомление о последних правилах Points Mall в декабре 2022 года.
boy illustration
Даже новички могут быстро приступить к работе с легким сервером приложений.
boy illustration
Взгляд на RSAC 2024|Защита конфиденциальности в эпоху больших моделей
boy illustration
Вы используете ИИ каждый день и до сих пор не знаете, как ИИ дает обратную связь? Одна статья для понимания реализации в коде Python общих функций потерь генеративных моделей + анализ принципов расчета.
boy illustration
Используйте (внутренний) почтовый ящик для образовательных учреждений, чтобы использовать Microsoft Family Bucket (1T дискового пространства на одном диске и версию Office 365 для образовательных учреждений)
boy illustration
Руководство по началу работы с оперативным проектом (7) Практическое сочетание оперативного письма — оперативного письма на основе интеллектуальной системы вопросов и ответов службы поддержки клиентов
boy illustration
[docker] Версия сервера «Чтение 3» — создайте свою собственную программу чтения веб-текста
boy illustration
Обзор Cloud-init и этапы создания в рамках PVE
boy illustration
Корпоративные пользователи используют пакет регистрационных ресурсов для регистрации ICP для веб-сайта и активации оплаты WeChat H5 (с кодом платежного узла версии API V3)
boy illustration
Подробное объяснение таких показателей производительности с высоким уровнем параллелизма, как QPS, TPS, RT и пропускная способность.
boy illustration
Удачи в конкурсе Python Essay Challenge, станьте первым, кто испытает новую функцию сообщества [Запускать блоки кода онлайн] и выиграйте множество изысканных подарков!
boy illustration
[Техническая посадка травы] Кровавая рвота и отделка позволяют вам необычным образом ощипывать гусиные перья! Не распространяйте информацию! ! !
boy illustration
[Официальное ограниченное по времени мероприятие] Сейчас ноябрь, напишите и получите приз
boy illustration
Прочтите это в одной статье: Учебник для няни по созданию сервера Huanshou Parlu на базе CVM-сервера.
boy illustration
Cloud Native | Что такое CRD (настраиваемые определения ресурсов) в K8s?
boy illustration
Как использовать Cloudflare CDN для настройки узла (CF самостоятельно выбирает IP) Гонконг, Китай/Азия узел/сводка и рекомендации внутреннего высокоскоростного IP-сегмента
boy illustration
Дополнительные правила вознаграждения амбассадоров акции в марте 2023 г.
boy illustration
Можно ли открыть частный сервер Phantom Beast Palu одним щелчком мыши? Супер простой урок для начинающих! (Прилагается метод обновления сервера)
boy illustration
[Играйте с Phantom Beast Palu] Обновите игровой сервер Phantom Beast Pallu одним щелчком мыши
boy illustration
Maotouhu делится: последний доступный внутри страны адрес склада исходного образа Docker 2024 года (обновлено 1 декабря)
boy illustration
Кодирование Base64 в MultipartFile
boy illustration
5 точек расширения SpringBoot, супер практично!
boy illustration
Глубокое понимание сопоставления индексов Elasticsearch.
boy illustration
15 рекомендуемых платформ разработки с нулевым кодом корпоративного уровня. Всегда найдется та, которая вам понравится.
boy illustration
Аннотация EasyExcel позволяет экспортировать с сохранением двух десятичных знаков.