В этой статье мы рассмотрим проблемы и решения, связанные с упорядочиванием сообщений в Apache Kafka. В распределенных системах обработка сообщений в правильном порядке имеет решающее значение для поддержания целостности и согласованности данных. Хотя Kafka предоставляет механизмы для поддержания порядка сообщений, реализация этого в распределенной среде имеет свои сложности.
Kafka поддерживает порядок внутри одного раздела, присваивая каждому сообщению уникальное смещение. Это гарантирует последовательное добавление сообщений внутри раздела. Однако когда мы масштабируем и используем несколько разделов, поддержание глобального порядка становится сложным. Разные разделы получают сообщения с разной скоростью, что усложняет строгое упорядочение между разделами.
Давайте поговорим о том, как Kafka обрабатывает порядок сообщений. Существуют некоторые различия между порядком, в котором производитель отправляет сообщения, и порядком, в котором их получает потребитель. Придерживаясь одного раздела, мы можем обрабатывать сообщения в том порядке, в котором они поступают к брокеру. Однако этот порядок может не совпадать с тем порядком, в котором мы их изначально отправили. Эта путаница может возникнуть из-за задержек в сети или из-за повторной отправки сообщений. Для обеспечения согласованности мы можем реализовать производителей с подтверждениями и повторными попытками. Таким образом, мы гарантируем, что сообщения не только поступят в Kafka, но и придут в правильном порядке.
Такое распределение по разделам, хотя и полезно для масштабируемости и отказоустойчивости, но усложняет реализацию глобального порядка сообщений. Например, мы отправляем два сообщения, M1 и M2, последовательно. Kafka получает их так же, как мы их отправили, но помещает в разные разделы. Проблема здесь в том, что то, что M1 отправляется первым, не означает, что оно будет обработано раньше M2. Это может быть затруднительно в ситуациях, когда порядок обработки имеет решающее значение, например при финансовых транзакциях.
Мы создали тему с именем «single_partition_topic», имеющую один раздел, и тему с именем «multi_partition_topic», имеющую 5 разделов. Вот пример темы с одним разделом, которому производитель отправляет сообщения:
UserEvent является реализованным Comparable Интерфейс POJO класс, помогает прессовать globalSequenceNumber(внешний серийный номер)Сортировка классов сообщений。потому чтопродюсертолькосуществоватьотправлять POJO Объект сообщения, мы реализовали собственный Jackson Сериализатор и десериализатор.
Раздел 0 получает все пользовательские события, причем идентификаторы событий отображаются в следующем порядке:
существовать Kafka , каждая группа потребителей действует как независимая организация. Если два потребителя принадлежат к разным группам потребителей, они оба получат все сообщения по теме. Это потому, что Кафка рассматривает каждую группу потребителей как отдельного подписчика.。
Если два потребителя принадлежат к одной группе потребителей и подписаны на тему с несколькими разделами,Кафка обеспечит Каждый потребитель читает данные из уникального набора разделов.。Это сделано для того, чтобы обеспечить одновременную обработку сообщений.。
Кафка гарантирует, что существование находится в пределах потребительской группы.,Никакие два потребителя не читают одно и то же сообщение,Поэтому каждое сообщение существует обрабатывается только один раз для каждой группы.
Следующий код является примером того, как один и тот же потребитель получает сообщения из той же темы:
существуют В этом случае,Результаты, которые мы получаем, показывают, что потребитель потребляет сообщения в том же порядке.,Вот последовательные идентификаторы событий из выходных данных:
Для тем с несколькими разделами конфигурация потребителей и производителей одинакова. Единственная разница заключается в теме и разделе, в который отправляется сообщение. Производитель отправляет сообщения в тему «multi_partition_topic»:
Потребитель потребляет сообщения одной и той же темы:
В выходных данных производителя перечислены идентификаторы событий и соответствующие им разделы следующим образом:
Для потребителей выходные данные покажут, что потребители потребляют сообщения в разном порядке. Идентификаторы событий в выходных данных следующие:
мы можемсуществовать Kafka Используйте один раздел в 'single_partition_topic' Как показано в примере, это обеспечивает порядок сообщений. Однако у этого подхода есть свои недостатки:
По сути,Один раздел гарантирует упорядочение за счет снижения пропускной способности.。
существуют в этом методе,производитель помечает каждое сообщение глобальным порядковым номером. Несколько экземпляров потребителей одновременно потребляют сообщения из разных разделов.,и используйте эти порядковые номера, чтобы изменить порядок сообщений,для обеспечения глобального порядка.
существуют реальные сценарии с несколькими продюсерами,Мы будем управлять глобальной последовательностью через общий ресурс, доступный всем процессам-производителям, например, последовательность базы данных или распределенный счетчик.。Это гарантирует, что серийный номерсуществовать Единственное среди всех сообщенийиупорядоченный,Независимо от того, какой производитель их присылает:
существовать Потребительская сторона,Группируем сообщения по временным окнам,Затем обработайте их по порядку. Мы существуем, сообщения, поступающие в течение определенного периода времени, объединяем их в группы.,По истечении срока действия окна,Обрабатываем партию. Это обеспечивает упорядоченную обработку в течение этого периода времени.,Даже если у них разное время прибытия в пределах окна. Потребитель буферизует сообщения на основе порядковых номеров и меняет их порядок перед обработкой. Нам необходимо убедиться, что сообщения обрабатываются в правильном порядке.,с этой целью,У потребителей должен быть буферный период,существуют Опросите сообщение несколько раз перед обработкой буферизованного сообщения.,И этот буферный период достаточно длинный,Чтобы устранить потенциальные проблемы с сортировкой сообщений:
Идентификатор каждого события существует в выходных данных вместе с соответствующим ему разделом, как показано ниже:
Потребительские выходные данные с глобальным порядковым номером и идентификатором события:
существуют в этом методе,Каждый экземпляр потребителя буферизует сообщения.,и обрабатывать их по порядку в зависимости от их серийных номеров. Однако,Есть некоторые соображения:
Kafka Функциональность идемпотентного производителя предназначена для доставки сообщений только один раз, что предотвращает любое дублирование. Это существованиепродюсер может иметь решающее значение в ситуациях, когда повторная попытка отправки сообщений может произойти из-за сетевых ошибок или других временных сбоев. Основная цель идемпотентности — предотвратить дублирование сообщений, но она косвенно влияет на порядок сообщений. Кафка Используйте две вещи для достижения идемпотентности: производители ID (PID) и служит серийным номером идемпотентного ключа, уникального в контексте конкретного Раздела.
Kafka Порядок сообщений гарантируется записью сообщений в раздел в производственном порядке благодаря порядковым номерам и PID и идемпотентные функции для предотвращения дублирования. Чтобы включить идемпотентного производителя, нам нужна конфигурация существующегопродюсера. “enable.idempotence” Свойство настроено на true:
Существует несколько ключевых конфигураций производителя и потребителя Kafka, которые могут повлиять на порядок сообщений и пропускную способность.
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
существоватьв этой статье,Мы углубились в глубину Kafka Сложность упорядочивания сообщений в . Мы изучаем проблемы и предлагаем решения. Будь то один раздел, внешняя сортировка и буферизация временных окон или идемпотентные производители, Kafka Предоставляет индивидуальные решения для удовлетворения потребностей в сортировке сообщений.