Сегодня мы узнаем об общих очередях сообщений RabbitMQ, RocketMQ и Kafka из статьи.
Exchange Рассылайте сообщения по Queue час, Exchange Типы соответствуют разным стратегиям дистрибуции, существует 3 типа. Exchange :Direct、Fanout、Topic。
TTL (Time To Live): время жить. RabbitMQ поддерживает время истечения срока действия сообщения, всего 2 типа.
Подтвердите механизм:
Как реализовать сообщение подтверждения подтверждения?
Механизм обратного сообщения:
Return ListenerИспользуется для обработки некоторых немаршрутизируемых сообщений.。
Наш производитель сообщений отправляет сообщение в определенную очередь, указывая Exchange и маршрутизацию, а затем наш потребитель прослушивает очередь, чтобы выполнить операции потребления и обработки сообщений.
Но в некоторых случаях, если когда мы отправляем сообщение, текущий обмен не существует или указанный ключ маршрутизации не может быть маршрутизирован. В это время нам нужно прослушивать такие недоступные сообщения, и нам нужно использовать Returnrn Listener.
В базовом API есть ключевой элемент конфигурации Mandatory: если он равен true, прослушиватель получит сообщение о том, что маршрут недоступен, и затем обработает его. Если оно ложно, брокер автоматически удалит сообщение.
Аналогично, при прослушивании chennel.addReturnListener(ReturnListener rl) передает ReturnListener, который переопределил метод handleReturn.
Когда потребитель осуществляет потребление,В случае возникновения бизнес-аномалий можно вести журналы.,Тогда компенсируйте. Но в случае серьезных проблем, таких как простой сервера,,мы должныРучное подтверждениеГарантировать потребительское потреблениестановитьсядостижение。
// DeliveryTag: сообщение уникально идентифицировано в mqv.
// Multiple: пакетировать ли (настройки iqos аналогичны параметрам)
// requeue: Вам нужно вернуться в очередь. Или Сбросьте или вернитесь к главе команды, чтобы снова съесть.
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
Как указано выше, код,информациясуществоватьПотребитель возвращается в очередьЭто ради того, чтобы быть правым?становитьсядостижениеиметь дело синформация,Верните сообщение Брокеру. Вообще говоря,В реальных приложениях он будет закрыт и возвращен вочередь(Избегайте входа в бесконечный цикл),То есть установлено значение false.
очередь недоставленных писем(DLX Dead-Letter-Exchange): когда сообщение становится неработающим в одной очереди, оно будет повторно отправлено в другую очередь, эта очередь является очередью. недоставленных писем。
DLX — это тоже обычный Exchange, ничем не отличающийся от обычного Exchange. Его можно указать в любой очереди, что собственно и задает свойства очереди.
Если в этой очереди есть плохое сообщение, RabbitMQ автоматически повторно опубликует личное сообщение на Exchange, а затем перенаправит его в другую очередь.
Официальный продукт Alibaba для обмена сообщениями для Double Eleven поддерживает все службы обмена сообщениями Alibaba Group. Он прошел строгие испытания на высокую доступность и надежность на протяжении более десяти лет и является основным продуктом транзакционной линии Alibaba.
Ракета: Ракета означает.
У него есть следующие основные концепции: брокер, тема, тег, очередь сообщений, сервер имен, группа, смещение, производитель и потребитель.
Давайте представим это подробно ниже.
Версия с открытым исходным кодом RocketMQ не поддерживает произвольную точность времени.,Поддерживает только определенный уровень,Например, таймер 5с,10s,1 минута и так далее. в,level=0Уровень означает отсутствие расширениячас,level=1 означает продление уровня 1, час,level=2 означает продление уровня 2 в час,И так далее.
Уровни задержки следующие:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Порядок сообщений означает, что сообщения могут использоваться в том порядке, в котором они были отправлены (FIFO). RocketMQ может строго гарантировать порядок сообщений, который можно разделить на порядок разделов или глобальный порядок.
Message Queuing MQ предоставляет аналогичные XAиз функциональности распределенных транзакций через обмен сообщениями TurnMQ Сообщения о В транзакциях можно добиться согласованности распределенных транзакций. Картинка выше иллюстрирует Сообщения. о транзакцияхиз Общий процесс:нормальный Сообщения о транзакцияхиз Отправить и отправить、Сообщения о транзакцияхиз компенсационного процесса.
Сообщения о транзакцияхотправлятьипредставлять на рассмотрение:
Сообщения о транзакцияхиз Процесс компенсации:
Среди них фаза компенсации используется для решения проблемы тайм-аута или сбоя сообщения Commit или Rollback.
Сообщения о транзакцияхсостояние:
Сообщения о транзакциях имеют три статуса: статус подачи.、статус отката、Промежуточное состояние:
RocketMQ по своей сути является распределенным и может быть настроен с использованием режима «главный-подчиненный» и горизонтального расширения.
Брокер в роли Мастера поддерживает чтение и запись, а Брокер в роли Подчинённого поддерживает только чтение, то есть Производитель может подключаться только к Брокеру в роли Мастера для написания сообщений. Потребитель может подключаться к Брокеру в роли Мастера; Ведущая роль, а также возможность подключения к Брокеру в подчиненной роли для чтения информации.
Высокая доступность потребления сообщений (master-slave):
В файле конфигурации Потребителя нет необходимости устанавливать, следует ли читать с Мастера или Слейва. Когда Мастер недоступен или занят, Потребитель автоматически переключится на чтение с Подчиненного. Благодаря механизму автоматического переключения потребителей, когда машина в роли Мастера выходит из строя, Потребитель все равно может читать сообщения от Подчиненного, не затрагивая программу Потребителя.
До версии 4.5, если Мастер-нода зависает, Подчиненный узел не может автоматически переключиться на мастер-ноду. В это время необходимо вручную остановить Брокер в роли Подчиненного, изменить файл конфигурации и запустить Брокер с новым. файл конфигурации. Но после версии 4.5 в RocketMQ появился механизм синхронизации Dledger. В это время, если мастер-узел зависает, Dledger выберет новый мастер-узел через протокол Raft без необходимости вручную изменять конфигурацию.
Отправка сообщений имеет высокую доступность (настройте несколько главных узлов):
При создании темы создайте несколько очередей сообщений темы в нескольких группах брокеров (машины с одинаковым именем брокера и разными идентификаторами брокера образуют группу брокеров), чтобы, когда мастер одной группы брокеров становится недоступным, мастера других групп все равно доступен, продюсер все равно может отправлять сообщения.
Репликация «главный-подчиненный»:
Если в группе брокеров есть ведущий и подчиненный устройства, сообщения необходимо копировать с главного устройства на подчиненное. Существует два метода репликации: синхронный и асинхронный.
В обычных обстоятельствах главный и подчиненный устройства должны быть настроены в режиме синхронной очистки диска, а главный и подчиненный устройства должны быть настроены в режиме асинхронной репликации. Таким образом, даже если один компьютер выйдет из строя, данные все равно будут гарантированно не потеряны. что является хорошим выбором.
Producerбалансировка нагрузки:
Сторона продюсера,Когда каждый экземпляр отправляет сообщение,,По умолчанию будетопросвсеизMessage Очередь отправляется для того, чтобы сообщения могли равномерно попадать в разные очереди. Поскольку очередь может быть разбросана по разным брокерам, сообщения отправляются разным брокерам, как показано ниже:
Consumerбалансировка нагрузки:
Если количество экземпляров Consumer больше, чем количество сообщений Общее количество Очередейиз слов еще больше,Дополнительные экземпляры Consumer не будут помещены в очередь.,Следовательно, сообщение не может быть использовано.,Он не сможет разделить нагрузку. Что необходимо контролировать, так это сделать общее количество очередей больше или равным числу потребителей.
Небалансировка для режима вещания нагрузкаиз, с просьбой доставить сообщение личному лицу Сообщение Все экземпляры потребителей относятся к группе потребления, поэтому в заявлениях о потреблении нет общего сообщения.
Когда происходит сбой потребления сообщения,RocketMQ автоматически повторит отправку сообщения. И если сообщение превышает максимальное количество повторов,RocketMQСразу会认дляэтотличное Возникла проблема с сообщением. Однако RocketMQ не отбросит проблемное сообщение немедленно, а отправит его по этому адресу. сообщение группа потребителей соответствует особого рода повороту:повороту недоставленных писем。очередь недоставленных письмаиз зовут %DLQ%+ConsumGroup 。
очередь недоставленных писемиметьк Более низкие характеристики:
Кафка — это распределенный、Поддержка разделения、несколько копийиз,На основе ZooKeeperкоординацияизраспределенныйинформациясистема。
Новая версия Kafka больше не требует ZooKeeper.
Его самая большая особенность заключается в том, что он может обрабатывать большие объемы данных в режиме реального времени для удовлетворения различных сценариев спроса: например, система пакетной обработки на основе Hadoop.、Низкая задержка в системе реального времени、Storm/Sparkпотоковый движок,Журналы Web/Nginx, журналы доступа,Службы обмена сообщениями и т. д.,использоватьНаписан на языке Scala.。принадлежатьApacheфундаментиз Топ проектов с открытым исходным кодом。
Давайте сначала взглянем на архитектурную диаграмму Кафки:
Основные концепции Кафки
В Kafka есть несколько основных концепций:
Тему, раздел и брокер можно понимать так:
тема,Представляет логический набор бизнес-данных.,Сравнивать Если в заказе размещены сообщения об операциях, связанных с заказом,Помещайте сообщения о действиях, связанные с пользователем, в пользовательскую тему.,Для крупных сайтов,Внутренние данные огромны,Сообщения о заказах, вероятно, будут очень большими по объему.,Сравнивать Если есть сотни G или даже достичь уровня ТБ,Если вы поместите так много данных на одну машину, обязательно возникнет проблема с ограничением емкости.,Затем вы можете разделить несколько разделов внутри темы, чтобы хранить данные в срезах.,Разные разделы могут быть расположены на разных машинах.,ЭквивалентноРаспределенное хранилище。Каждый Запуск на всех машинаходининдивидуальныйKafkaизпроцессBroker。
В кластере Kafka будет один или несколько Брокеров, один из которых будет выбран контроллером (Kafka Controller), под которым можно понимать Брокера-Лидера. Он отвечает за управление состоянием всех разделов и реплик в целом. кластер.
Partition-Leader
Механизм выбора контроллера
Когда кластер Kafka запускается, процесс выбора заключается в том, что каждый брокер в кластере попытается создать временный узел /controller в ZooKeeper. ZooKeeper обеспечит успешное создание одного и только одного брокера, и этот брокер станет главным контроллером. кластера.
Когда брокер этой роли контроллера выйдет из строя, временный узел ZooKeeper исчезнет. Другие брокеры в кластере всегда будут контролировать этот временный узел. Если они обнаружат, что временный узел исчез, они будут соревноваться за создание временного узла снова, что приведет к его исчезновению. — это механизм выборов, о котором мы упоминали выше. ZooKeeper гарантирует, что Брокер станет новым Контролером. Брокер со статусом контролера требует на одну ответственность больше, чем другие обычные брокеры. Конкретные детали заключаются в следующем:
Механизм выбора лидера копии раздела
Когда Контроллер обнаруживает, что Брокер, на котором находится лидер раздела, не работает, Контроллер выберет первого Брокера в качестве Лидера из списка ISR (в соответствии с параметром unclean.leader.election.enable=false) (первый Брокер сначала помещается в список ISR) таблица, которая может быть репликой с наиболее синхронизированными данными). Если параметр unclean.leader.election.enable имеет значение true, это означает, что когда все реплики в списке ISR отключены, лидер может быть выбран из реплик, отличных от Список ISR. Этот параметр может повысить удобство использования, но новый выбранный лидер может содержать гораздо меньше данных. Есть два условия для попадания реплики в список ISR:
Механизм офсетной записи сообщений о потреблении потребителями
Каждый потребитель будет регулярно отправлять смещение своего раздела потребления во внутреннюю тему Kafka: Consumer_offsets. При отправке ключом является ConsumerGroupId+topic+номер раздела, а значением является значение текущего смещения. Kafka будет регулярно очищать сообщения в нем. Тема и, наконец, Сохраняйте последние данные.
Поскольку __consumer_offsets может получать запросы с высокой степенью одновременности, Kafka по умолчанию выделяет для него 50 разделов (можно установить через offsets.topic.num.partitions), чтобы он мог противостоять большому параллелизму путем добавления компьютеров.
Ребалансировка означает, что если количество потребителей в группе потребителей изменится или количество потребляемых разделов изменится, Kafka перераспределит отношения между потребителями и потребительскими разделами. Например, если потребитель в группе потребителей умрет, назначенные ему разделы будут автоматически переданы другим потребителям. Если он перезапустится, некоторые разделы будут ему возвращены.
Примечание. Перебалансировка предназначена только для подписки, в которой не указано использование разделов. Если разделы указаны через назначение, Kafka не будет выполнять перебалансировку.
Следующие ситуации могут вызвать ребалансировку потребителей:
Во время процесса ребалансировки потребители не могут получать сообщения от Kafka, что повлияет на TPS Kafka. Если в кластере Kafka много узлов, например сотни, ребалансировка может занять очень много времени, поэтому старайтесь избегать пиковой ребалансировки. .
Процесс ребалансировки выглядит следующим образом.
Когда потребитель присоединяется к группе потребителей, потребитель, группа потребителей и координатор группы проходят следующие этапы:
Этап первый: выбор координатора группы
Координатор группы: каждая группа потребителей выберет брокера в качестве координатора своей группы, который отвечает за мониторинг сердечного ритма всех потребителей в группе потребителей, определение наличия простоев, а затем запуск ребалансировки потребителей. Когда каждый потребитель в группе потребителей запускается, он отправляет запрос FindCoordinatorRequest узлу в кластере Kafka, чтобы найти соответствующий координатор группы GroupCoordinator и установить с ним сетевое соединение. Метод выбора координатора группы. Для выбора раздела __consumer_offsets, которому должно быть отправлено смещение, потребляемое потребителем, можно использовать следующую формулу. Брокер, соответствующий лидеру этого раздела, является формулой координатора этой группы потребителей:
hash(идентификатор группы потребителей) % Количество разделов, соответствующих теме
Второй этап: Вступайте в группу потребителей JOIN GROUP.
После успешного обнаружения GroupCoordinator, соответствующего группе потребителей, он переходит к этапу присоединения к группе потребителей. На этом этапе потребитель отправит запрос JoinGroupRequest в GroupCoordinator и обработает ответ. Затем координатор группы выбирает первого потребителя из группы потребителей, присоединившегося к группе, в качестве лидера (координатора группы потребителей), отправляет информацию о группе потребителей лидеру, а затем лидер отвечает за формулирование плана разделения.
Третий этап (SYNC GROUP)
Лидер-потребитель отправляет запрос SyncGroupRequest в GroupCoordinator, а затем GroupCoordinator выдает план раздела каждому потребителю. Они будут выполнять сетевые подключения и потребление сообщений в соответствии с Leader Broker указанного раздела.
Стратегия распределения разделов потребительской ребалансировки
Существует три основные стратегии ребаланса: диапазон 、 round-robin 、 sticky 。По умолчанию стратегия назначается диапазону.。
Предположим, что в топике 10 разделов (0-9), и теперь есть три потребителя:
Стратегия диапазона: Распределение в порядке согласно серийному номеру раздела. Предположим, что n = количество разделов / количество потребителей = 3, m = количество разделов % количество потребителей = 1, тогда каждому из первых m потребителей будет выделено n+1. разделы и следующие (Количество потребителей — m). Каждому потребителю выделяется n разделов. Например, разделы 0–3 предоставляются одному потребителю, разделы 4–6 — одному потребителю, а разделы 7–9 — одному потребителю.
Стратегия циклического перебора: распределение по циклическому принципу, например, разделы 0, 3, 6 и 9 передаются одному потребителю, разделы 1, 4 и 7 — одному потребителю, а разделы 2, 5 и 8 — отдается одному потребителю.
«Жесткая» стратегия: первоначальная стратегия распределения аналогична циклической, но во время ребалансировки необходимо обеспечить соблюдение следующих двух принципов:
Когда двое конфликтуютчас,Нет.одининдивидуальный目标优先ВНет.二индивидуальный目标 . Это может в наибольшей степени сохранить исходную стратегию распределения разделов. Например, для распределения в ситуации первого диапазона, если третий потребитель повесит трубку, результат повторного использования стратегии закрепления будет следующим: потребитель1, кроме исходного 0~ 3, будет выделено еще 7 потребитель2 В дополнение к исходному 4~ 6, 8 и 9 будут распределены снова.
Анализ механизма публикации сообщений продюсера
1. Метод письма
Производитель использует режим push для публикации сообщений брокеру.,Каждое сообщение добавляется к паттерну.,принадлежать Запись на диск последовательно(Запись на диск последовательно Сравнивать писать случайно КПД должен быть высоким, а гарантия kafka Пропускная способность)。
2. Маршрутизация сообщений
Когда производитель отправляет сообщение брокеру, он выбирает, в каком разделе его хранить, на основе алгоритма разделения. Его механизм маршрутизации:
хэш(ключ)%количество разделов
3. Процесс написания
HW широко известен как верхний предел, сокращение от High Watermark. Наименьший LEO (смещение конца журнала) в ISR, соответствующий разделу, принимается за HW. Потребитель может потреблять только до местоположения HW. . Кроме того, каждая реплика имеет аппаратное обеспечение, а лидер и ведомый несут ответственность за обновление статуса своего собственного аппаратного обеспечения. Для сообщений, недавно написанных лидером, потребитель не может использовать их немедленно. Лидер будет ждать, пока сообщение будет синхронизировано репликами во всех ISR, а затем обновит аппаратное обеспечение. В это время сообщение может быть использовано потребителем. Это гарантирует, что в случае сбоя брокера, в котором находится лидер, сообщение все равно можно будет получить от вновь избранного лидера. Для запросов на чтение от внутренних брокеров аппаратных ограничений нет.
Данные сообщений раздела Kafka хранятся в папке, названной именем темы + номер раздела. Сообщения хранятся в сегментах внутри раздела. Сообщения каждого сегмента хранятся в разных файлах журналов. Максимальный размер файла журнала определяется Kafka. размер сегмента — 1Г. Цель этого ограничения — облегчить загрузку файла журнала в память для работы:
1 ### Часть индексного файла сообщения со смещением. Каждый раз, когда Kafka отправляет сообщения размером 4 КБ (настраиваемые) в раздел, оно записывает текущее сообщение со смещением в индексный файл.
2 ### Если вы хотите найти сообщение, isoffset сначала быстро найдет его в этом файле, а затем перейдет к файлу журнала, чтобы найти конкретное сообщение.
3 00000000000000000000.index
4 ### Файл хранения сообщений, в основном хранит смещение и тело сообщения.
5 00000000000000000000.log
6 ### Сообщение отправляется в файл индекса времени. Каждый раз, когда Kafka отправляет в раздел сообщение размером 4 КБ (настраиваемое), он записывает текущее сообщение и отправляет метку времени и соответствующее смещение в файл индекса времени.
7 ### Если вам нужно найти сообщение со смещением по времени, сначала оно будет искаться в этом файле.
8 00000000000000000000.timeindex
9
10 00000000000005367851.index
11 00000000000005367851.log
12 00000000000005367851.timeindex
13
14 00000000000009936472.index
15 00000000000009936472.log
16 00000000000009936472.timeindex
Это число, например 9936472, представляет собой начальное смещение, содержащееся в этом файле сегмента журнала. Это означает, что в этот раздел записано как минимум около 10 миллионов фрагментов данных. В Kafka Broker есть параметр log.segment.bytes, который ограничивает размер каждого файла сегмента журнала. Максимальный размер — 1 ГБ. Когда файл сегмента журнала заполнен, для записи автоматически открывается новый файл сегмента журнала, чтобы предотвратить слишком большой размер отдельного файла и влияние на производительность чтения и записи файла. Этот процесс называется сменой журнала и файлом сегмента журнала. записываемый сегмент называется активным сегментом журнала.
Наконец, прикрепите диаграмму данных узла ZooKeeper.
RabbitMQ:
Сохранение базы данных:
1. Выполните данные бизнес-заказа и сгенерируйте сообщение «становитсяизсообщение» для операции «Выносливость» (обычно вставляется в базу данных, здесь, если база данных разделена, могут быть задействованы распределенные транзакции)
2. Отправьте сообщение на сервер брокера.
3. С помощью механизма RabbitMQizConfirm на стороне производителя отслеживать, подтверждает ли сервер подтверждение.
4. Если получено подтверждение, обновите статус данных сообщения на «отправлено». В случае сбоя измените его на статус сбоя.
5. Распределенная запланированная задача запрашивает базу данных в течение 3 минут (это конкретное время следует определять исходя из своевременности) перед отправкой сообщения о сбое.
6. Отправьте сообщение повторно и запишите количество отправлений.
7. Если после слишком частой отправки по-прежнему происходит сбой, необходимо устранить неполадки вручную или выполнить другие операции.
Преимущества: Это может гарантировать, что сообщение не будет потеряно на 100%.
Недостатки: Первый шаг будет связан с проблемами распределенных транзакций.
Задержка доставки сообщений:
На блок-схеме разные цвета обозначают разные сообщения.
1. Разместить заказ Выносливость
2. Отправьте сообщение брокеру (называемое основным сообщением), а затем отправьте то же сообщение другому коммутатору (это сообщение называется сообщением подтверждения).
3. После того, как основное сообщение использовано фактической стороной бизнес-обработки, генерируется ответное сообщение. Прежде чем подтвердить сообщение за сообщением Обработка приложения службы хранится в базе данных.
4~6. Фактическая сторона бизнес-обработки отправляет сообщение с подтверждением. После того как Служба получает его, она изменяет исходный статус сообщения.
7. Если сообщение не подтверждено, весь процесс будет повторно выполнен производителем посредством вызова RPC.
Преимущества: Скорость ответа улучшена по сравнению с решением с сохранением данных.
Недостатки: сложность системы немного высока. Если оба сообщения терпят неудачу и сообщения теряются, для компенсации все равно необходим механизм подтверждения.
RocketMQ
Производитель теряет данные:
Производитель находится в процессе отправки сообщения брокеру.,Потеряно из-за проблем с сетью и т. д.,или Сообщение прибыло брокеру,но что-то пошло не так,Не сохранилось. Для этой проблемы,RocketMQверноProducerотправлятьинформациянабор3способ:
Отправить синхронно
Отправить асинхронно
Отправка в одну сторону
Брокер потерял данные:
Брокер получил Сообщение и временно сохранил его в памяти. Прежде чем Потребитель успел его усвоить, Брокер умер.
Это можно решить с помощью настроек постоянства:
После этих двух шагов, даже если Брокер повесит трубку, Производитель точно не получит подтверждение и сможет отправить его повторно.
Потребители теряют данные:
Потребитель использовал Сообщение, но возникла внутренняя проблема, и Сообщение еще не было обработано. Брокер считает, что Потребитель завершил его обработку и будет отправлять только последующие сообщения. В это время необходимо Отключите автоподтверждение и выполните подтверждение вручную после обработки сообщения. , Сообщения, которые не удалось обработать несколько раз, будут добавлены очередь недоставленных писем , в данный момент требуется ручное вмешательство.
Производитель теряет данные
Если установлено значение acks=all, оно не будет потеряно. Требуется, чтобы ваш лидер получил сообщение и все последователи синхронизировали сообщение, прежде чем запись будет считаться успешной. Если это условие не выполнено, производитель будет автоматически и непрерывно повторять попытки неограниченное количество раз.
Брокер потерял данные
Брокер Кафки уходит, а затем переизбирается лидер раздела. Подумайте об этом: если у других ведомых есть какие-то данные, которые в это время не синхронизированы, и лидер в это время кладет трубку, а затем выбирает ведомого в качестве лидера, не будут ли некоторые данные отсутствовать? Это приведет к потере некоторых данных.
В это время обычно требуется установить как минимум следующие четыре параметра:
replication.factor
min.insync.replicas
acks=all
retries=MAX
Наша производственная среда настроена в соответствии с вышеуказанными требованиями, по крайней мере, после этой настройки. Kafka broker терминал может быть гарантированно находится в leader расположение broker Если возникла неисправность,продолжайте leader Переключите время, данные не будут потеряны.
Потребители теряют данные
Вы потребляете это сообщение, а затем потребитель автоматически отправляет смещение, заставляя Kafka думать, что вы использовали сообщение, но на самом деле вы только что подготовились к обработке сообщения, и прежде чем обработать его, вы вешаете трубку. Сообщение будет потеряно.
Речь идет не о RabbitMQ Почти все знают Kafka Будет отправлено автоматически смещение, то до тех пор, пока Отключить автоматическую отправку смещение, отправьте его вручную после обработки смещение может гарантировать, что данные не будут потеряны. Однако в это время все еще может произойти повторное потребление. Если вы только что завершили обработку и еще не отправили его, offset, в результате он зависает. В это время он обязательно будет израсходован снова. Просто обеспечьте идемпотентность.
Если взять в качестве примера RocketMQ, то сценарии дублирования сообщений перечислены ниже:
Дублируются сообщения при отправке
Когда сообщение было успешно отправлено на сервер и завершено сохранение, происходит сбой в сети или клиент не работает, в результате чего сервер не может ответить клиенту. Если производитель поймет, что сообщение не удалось отправить, и попытается отправить сообщение еще раз, потребитель впоследствии получит два сообщения с одинаковым содержанием и тем же идентификатором сообщения.
Дублирование сообщения во время доставки
В контексте потребления сообщений,Сообщение доставлено потребителю, и бизнес-обработка завершена.,Когда клиент отвечает серверу, сеть прерывается. Чтобы гарантировать, что сообщение будет использовано хотя бы один раз,информацияочередьRocketMQверсияиз Сервер Волясуществоватьвосстановление сетиназад再次尝试投递之前已被иметь дело с过изинформация,Впоследствии потребитель получит два сообщения с одинаковым содержанием.
балансировка дублирование сообщений о нагрузке (включая, помимо прочего, джиттер сети, перезапуск брокера) также перезапуск потребительского приложения)
Когда сообщение версии RocketMQ из Broker или клиента перезапускается, расширяется или сжимается, будет запущена перебалансировка. В это время потребитель может получать повторяющиеся сообщения.
Итак, каковы решения? Прямо над картинкой.
По данному вопросу следует учитывать несколько моментов:
Как быстро израсходовать накопившиеся сообщения?
Временно напишите потребитель распределения сообщений, чтобы равномерно распределить сообщения в очереди отставания по N очередям. При этом одна очередь соответствует одному потребителю, что эквивалентно увеличению скорости потребления в N раз.
До модификации:
После модификации:
Невыполненная работа слишком велика, поэтому срок действия некоторых сообщений истек. Что мне делать?
Пакетное перенаправление. Когда бизнес не занят,Сравниватьнапример, раннее утро,Подготовьте процедуры заранее,Найдите недостающую партию сообщений,Повторный импорт в MQ.
Имеется большое количество сообщений, MQ-диск переполнен, новые сообщения не могут поступить, большое количество сообщений теряется. Как с этим бороться?
Обойти это невозможно. Кто заставляет [потребителя распространения сообщений] писать слишком медленно? Вы пишете временную программу, получаете доступ к данным для использования, потребляете одно и выбрасываете другое, а затем быстро потребляете все сообщения? Тогда выберите второй вариант и пополните данные ночью.