Обеспечение порядка сообщений в Kafka: стратегия и настройка
Обеспечение порядка сообщений в Kafka: стратегия и настройка

1. Обзор

В этой статье мы рассмотрим проблемы и решения, связанные с упорядочиванием сообщений в Apache Kafka. В распределенных системах обработка сообщений в правильном порядке имеет решающее значение для поддержания целостности и согласованности данных. Хотя Kafka предоставляет механизмы для поддержания порядка сообщений, реализация этого в распределенной среде имеет свои сложности.

2. Порядок внутри раздела и его проблемы

Kafka поддерживает порядок внутри одного раздела, присваивая каждому сообщению уникальное смещение. Это гарантирует последовательное добавление сообщений внутри раздела. Однако когда мы масштабируем и используем несколько разделов, поддержание глобального порядка становится сложным. Разные разделы получают сообщения с разной скоростью, что усложняет строгое упорядочение между разделами.

2.1 Выбор времени производителей и потребителей

Давайте поговорим о том, как Kafka обрабатывает порядок сообщений. Существуют некоторые различия между порядком, в котором производитель отправляет сообщения, и порядком, в котором их получает потребитель. Придерживаясь одного раздела, мы можем обрабатывать сообщения в том порядке, в котором они поступают к брокеру. Однако этот порядок может не совпадать с тем порядком, в котором мы их изначально отправили. Эта путаница может возникнуть из-за задержек в сети или из-за повторной отправки сообщений. Для обеспечения согласованности мы можем реализовать производителей с подтверждениями и повторными попытками. Таким образом, мы гарантируем, что сообщения не только поступят в Kafka, но и придут в правильном порядке.

2.2 Проблемы с несколькими разделами

Такое распределение по разделам, хотя и полезно для масштабируемости и отказоустойчивости, но усложняет реализацию глобального порядка сообщений. Например, мы отправляем два сообщения, M1 и M2, последовательно. Kafka получает их так же, как мы их отправили, но помещает в разные разделы. Проблема здесь в том, что то, что M1 отправляется первым, не означает, что оно будет обработано раньше M2. Это может быть затруднительно в ситуациях, когда порядок обработки имеет решающее значение, например при финансовых транзакциях.

2.3 Последовательность сообщений одного раздела

Мы создали тему с именем «single_partition_topic», имеющую один раздел, и тему с именем «multi_partition_topic», имеющую 5 разделов. Вот пример темы с одним разделом, которому производитель отправляет сообщения:

UserEvent является реализованным Comparable Интерфейс POJO класс, помогает прессовать globalSequenceNumber(внешний серийный номер)Сортировка классов сообщений。потому чтопродюсертолькосуществоватьотправлять POJO Объект сообщения, мы реализовали собственный Jackson Сериализатор и десериализатор.

Раздел 0 получает все пользовательские события, причем идентификаторы событий отображаются в следующем порядке:

существовать Kafka , каждая группа потребителей действует как независимая организация. Если два потребителя принадлежат к разным группам потребителей, они оба получат все сообщения по теме. Это потому, что Кафка рассматривает каждую группу потребителей как отдельного подписчика.

Если два потребителя принадлежат к одной группе потребителей и подписаны на тему с несколькими разделами,Кафка обеспечит Каждый потребитель читает данные из уникального набора разделов.。Это сделано для того, чтобы обеспечить одновременную обработку сообщений.。

Кафка гарантирует, что существование находится в пределах потребительской группы.,Никакие два потребителя не читают одно и то же сообщение,Поэтому каждое сообщение существует обрабатывается только один раз для каждой группы.

Следующий код является примером того, как один и тот же потребитель получает сообщения из той же темы:

существуют В этом случае,Результаты, которые мы получаем, показывают, что потребитель потребляет сообщения в том же порядке.,Вот последовательные идентификаторы событий из выходных данных:

2.4 Последовательность многораздельных сообщений

Для тем с несколькими разделами конфигурация потребителей и производителей одинакова. Единственная разница заключается в теме и разделе, в который отправляется сообщение. Производитель отправляет сообщения в тему «multi_partition_topic»:

Потребитель потребляет сообщения одной и той же темы:

В выходных данных производителя перечислены идентификаторы событий и соответствующие им разделы следующим образом:

Для потребителей выходные данные покажут, что потребители потребляют сообщения в разном порядке. Идентификаторы событий в выходных данных следующие:

3.1 Использование одного раздела

мы можемсуществовать Kafka Используйте один раздел в 'single_partition_topic' Как показано в примере, это обеспечивает порядок сообщений. Однако у этого подхода есть свои недостатки:

  • Предел пропускной способности: представьте, что мы существуем в оживленной пиццерии. Если у нас есть только один шеф-повар (продюсер) и один официант (потребитель) (Раздел), работающие за одним столом, они существуют и могут обслуживать только такое количество пиццы, прежде чем все начнет накапливаться. существовать Kafka В мире, когда мы имеем дело с большим количеством сообщений, придерживаться одного Раздела похоже на сценарий с одной таблицей. В сценариях существования с большим объемом одна-единственная секция становится узким местом, а скорость обработки сообщений ограничена, поскольку только один производитель и один потребитель могут работать в одном существе одновременно.
  • Уменьшите параллелизм: в приведенном выше примере, если у нас есть несколько поваров (продюсер) и официантов (потребителей), работающих за несколькими столами (Раздел), то количество выполненных заказов увеличится. Кафка Преимуществасуществоватьчерез несколько Раздел Параллельная обработка。только один Раздел,Это преимущество теряется,что приводит к последовательной обработке,и еще больше ограничивает поток сообщений.

По сути,Один раздел гарантирует упорядочение за счет снижения пропускной способности.

3.2 Внешняя сортировка и буферизация временного окна

существуют в этом методе,производитель помечает каждое сообщение глобальным порядковым номером. Несколько экземпляров потребителей одновременно потребляют сообщения из разных разделов.,и используйте эти порядковые номера, чтобы изменить порядок сообщений,для обеспечения глобального порядка.

существуют реальные сценарии с несколькими продюсерами,Мы будем управлять глобальной последовательностью через общий ресурс, доступный всем процессам-производителям, например, последовательность базы данных или распределенный счетчик.。Это гарантирует, что серийный номерсуществовать Единственное среди всех сообщенийиупорядоченный,Независимо от того, какой производитель их присылает:

существовать Потребительская сторона,Группируем сообщения по временным окнам,Затем обработайте их по порядку. Мы существуем, сообщения, поступающие в течение определенного периода времени, объединяем их в группы.,По истечении срока действия окна,Обрабатываем партию. Это обеспечивает упорядоченную обработку в течение этого периода времени.,Даже если у них разное время прибытия в пределах окна. Потребитель буферизует сообщения на основе порядковых номеров и меняет их порядок перед обработкой. Нам необходимо убедиться, что сообщения обрабатываются в правильном порядке.,с этой целью,У потребителей должен быть буферный период,существуют Опросите сообщение несколько раз перед обработкой буферизованного сообщения.,И этот буферный период достаточно длинный,Чтобы устранить потенциальные проблемы с сортировкой сообщений:

Идентификатор каждого события существует в выходных данных вместе с соответствующим ему разделом, как показано ниже:

Потребительские выходные данные с глобальным порядковым номером и идентификатором события:

3.3 Рекомендации по внешней сортировке и буферизации

существуют в этом методе,Каждый экземпляр потребителя буферизует сообщения.,и обрабатывать их по порядку в зависимости от их серийных номеров. Однако,Есть некоторые соображения:

  • Размер буфера. Размер буфера можно увеличить в зависимости от количества входящих сообщений. существования отдает приоритет реализациям, которые строго сортируются по порядковому номеру,Мы можем увидеть значительный рост буферов,Особенно если есть задержка доставки сообщения. Например,Если мы обрабатываем 100 элементов в минуту, мы будем обрабатывать 200 элементов в минуту.,Буфер неожиданно вырастет. поэтому,Мы должны эффективно управлять размером буфера,И подготовьте стратегию на случай, если существование превысит ожидаемые пределы.
  • Задержка: когда мы буферизуем сообщения,Фактически мы заставляем их ждать некоторое время перед обработкой (вводя задержку). с одной стороны,С другой стороны, это помогает нам оставаться организованными;,Это замедляет весь процесс. Главное — найти правильный баланс между поддержанием порядка и минимизацией задержек.
  • Сбой: в случае сбоя потребителя мы можем потерять буферизованные сообщения. Чтобы предотвратить это, нам может потребоваться периодически сохранять состояние буферизации.
  • Поздние сообщения: Сообщения, поступающие после окна обработки существования, будут не по порядку. В зависимости от варианта использования нам могут потребоваться политики для обработки или удаления таких сообщений.
  • Управление состоянием. Если обработка включает в себя операции с отслеживанием состояния, нам потребуются механизмы для управления и сохранения состояния в разных окнах.
  • Использование ресурсов: существующие буферы требуют памяти для хранения большого количества сообщений. Нам нужно убедиться, что у нас достаточно ресурсов, чтобы справиться с этим.,Особенно, если сообщение существует и находится в буфере длительное время.

3.4 Идемпотентные производители

Kafka Функциональность идемпотентного производителя предназначена для доставки сообщений только один раз, что предотвращает любое дублирование. Это существованиепродюсер может иметь решающее значение в ситуациях, когда повторная попытка отправки сообщений может произойти из-за сетевых ошибок или других временных сбоев. Основная цель идемпотентности — предотвратить дублирование сообщений, но она косвенно влияет на порядок сообщений. Кафка Используйте две вещи для достижения идемпотентности: производители ID (PID) и служит серийным номером идемпотентного ключа, уникального в контексте конкретного Раздела.

  • Серийный номер: Кафка Каждому сообщению, отправленному продюсером, присваивается порядковый номер. Эти порядковые номера уникальны в каждом разделе, что гарантирует, что производитель отправляет сообщения в определенном порядке существования. Kafka При приеме существование в пределах одного Раздела пишется в том же порядке. Серийный номер гарантирует заказ в пределах одного раздела. Однако, когда существование создает сообщения для нескольких Разделов, не существует гарантии глобального порядка между Разделами. Например, если продюсер отправит сообщение M1、M2 и M3 Отправлять в разделы отдельно P1、P2 и P3,Тогда каждое сообщение существует и получает уникальный порядковый номер в своем Разделе. Однако,Это не гарантирует относительный порядок потребления этих Разделов.
  • Идентификатор производителя (PID). Когда идемпотентность включена, брокер назначает каждому производителю уникальный идентификатор производителя (PID). Этот PID в сочетании с порядковым номером позволяет Kafka идентифицировать и отбрасывать любые повторяющиеся сообщения из-за повторных попыток производителя.

Kafka Порядок сообщений гарантируется записью сообщений в раздел в производственном порядке благодаря порядковым номерам и PID и идемпотентные функции для предотвращения дублирования. Чтобы включить идемпотентного производителя, нам нужна конфигурация существующегопродюсера. “enable.idempotence” Свойство настроено на true:

4. Ключевые конфигурации производителей и потребителей

Существует несколько ключевых конфигураций производителя и потребителя Kafka, которые могут повлиять на порядок сообщений и пропускную способность.

4.1#### 4.1 Конфигурация производителя
  • MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION: Если мы отправим большое количество сообщений, Кафка Этот параметр помогает определить, сколько сообщений мы можем отправить, не дожидаясь подтверждения о прочтении. Если мы установим это значение выше, чем 1 Без включения идемпотентности мы можем нарушить порядок сообщений, если нам понадобится их повторно отправить. Однако если мы включим идемпотентность, Кафка Даже если мы отправляем много сообщений одновременно, порядок сообщений сохраняется. Если нам нужен очень строгий порядок, например, чтобы гарантировать, что каждое существующее сообщение будет прочитано до отправки следующего сообщения, мы должны установить это значение равным 1. Если бы мы хотели поставить скорость выше идеального порядка, мы могли бы установить 5, но это может привести к проблемам с упорядочиванием.
  • BATCH_SIZE_CONFIG и LINGER_MS_CONFIG: Kafka Управляет размером пакета по умолчанию (в байтах) с целью группировки записей из одного раздела в меньшее количество запросов для повышения производительности. Если мы установим этот лимит слишком низким, мы отправим много маленьких групп, что может замедлить нашу работу. Но если мы установим слишком высокое значение, это может оказаться не лучшим использованием памяти. Кафка Вы можете подождать некоторое время перед отправкой группы, если она еще не заполнена. Это время ожидания определяется выражением LINGER_MS_CONFIG контроль. Если больше сообщений поступает достаточно быстро, чтобы заполнить установленный нами лимит, они отправляются немедленно, в противном случае Kafka Не буду больше ждать - это будетсуществоватькогда время вышлоотправлятьвсе, что у нас есть。это каксуществоватьскоростьибаланс между эффективностью,Убедитесь, что мы отправляем достаточно сообщений одновременно,без ненужных задержек.
Язык кода:properties
копировать
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
4.2 Конфигурация потребителя
  • MAX_POLL_RECORDS_CONFIG: это Kafka Ограничение количества записей, которые потребитель может получить за один запрос данных. Если мы установим это число большим, мы сможем поглощать большие объемы данных одновременно, увеличивая нашу пропускную способность. но есть загвоздка - Чем больше мы приобретаем, тем труднее может быть поддерживать все в порядке. Итак, нам нужно найти ту золотую середину, где мы эффективны, но не перегружены.
  • FETCH_MIN_BYTES_CONFIG: Если мы установим это число очень большим, Кафка будет ждать, пока у него не будет достаточно данных, чтобы удовлетворить наше минимальное количество байтов, прежде чем отправлять его. Это может означать меньшее количество поездок (или приобретений), что хорошо для эффективности. Но если мы спешим и хотим быстро получить данные, мы можем установить это число меньше, чтобы Kafka будет быстрееотправлятьвсе, что у него есть。Например,Если наше потребительское приложение требует ресурсов или требует строгого порядка сообщений,Особенно в случае многопоточности,Меньшие партии могут быть более выгодными.
  • FETCH_MAX_WAIT_MS_CONFIG: Это определит, чего ждут наши потребители. Kafka Соберите достаточно данных, чтобы удовлетворить наши FETCH_MIN_BYTES_CONFIG время。Если мы установим это время очень высоким,Наши потребители готовы ждать дольше,Можно получить больше данных одновременно. Но если мы будем действовать в спешке,Мы ставим его ниже,Таким образом, наши потребители будут получать данные быстрее, даже если у них не так много данных. Это компромисс между ожиданием большей прибыли и быстрыми действиями.
Язык кода:properties
копировать
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");

5. Заключение

существоватьв этой статье,Мы углубились в глубину Kafka Сложность упорядочивания сообщений в . Мы изучаем проблемы и предлагаем решения. Будь то один раздел, внешняя сортировка и буферизация временных окон или идемпотентные производители, Kafka Предоставляет индивидуальные решения для удовлетворения потребностей в сортировке сообщений.

boy illustration
Учебное пособие по Jetpack Compose для начинающих, базовые элементы управления и макет
boy illustration
Код js веб-страницы, фон частицы, код спецэффектов
boy illustration
【новый! Суперподробное】Полное руководство по свойствам компонентов Figma.
boy illustration
🎉Обязательно к прочтению новичкам: полное руководство по написанию мини-программ WeChat с использованием программного обеспечения Cursor.
boy illustration
[Забавный проект Docker] VoceChat — еще одно приложение для мгновенного чата (IM)! Может быть встроен в любую веб-страницу!
boy illustration
Как реализовать переход по странице в HTML (html переходит на указанную страницу)
boy illustration
Как решить проблему зависания и низкой скорости при установке зависимостей с помощью npm. Существуют ли доступные источники npm, которые могут решить эту проблему?
boy illustration
Серия From Zero to Fun: Uni-App WeChat Payment Practice WeChat авторизует вход в систему и украшает страницу заказа, создает интерфейс заказа и инициирует запрос заказа
boy illustration
Серия uni-app: uni.navigateЧтобы передать скачок значения
boy illustration
Апплет WeChat настраивает верхнюю панель навигации и адаптируется к различным моделям.
boy illustration
JS-время конвертации
boy illustration
Обеспечьте бесперебойную работу ChromeDriver 125: советы по решению проблемы chromedriver.exe не найдены
boy illustration
Поле комментария, щелчок мышью, специальные эффекты, js-код
boy illustration
Объект массива перемещения объекта JS
boy illustration
Как открыть разрешение на позиционирование апплета WeChat_Как использовать WeChat для определения местонахождения друзей
boy illustration
Я даю вам два набора из 18 простых в использовании фонов холста Power BI, так что вам больше не придется возиться с цветами!
boy illustration
Получить текущее время в js_Как динамически отображать дату и время в js
boy illustration
Вам необходимо изучить сочетания клавиш vsCode для форматирования и организации кода, чтобы вам больше не приходилось настраивать формат вручную.
boy illustration
У ChatGPT большое обновление. Всего за 45 минут пресс-конференция показывает, что OpenAI сделал еще один шаг вперед.
boy illustration
Copilot облачной разработки — упрощение разработки
boy illustration
Микросборка xChatGPT с низким кодом, создание апплета чат-бота с искусственным интеллектом за пять шагов
boy illustration
CUDA Out of Memory: идеальное решение проблемы нехватки памяти CUDA
boy illustration
Анализ кластеризации отдельных ячеек, который должен освоить каждый&MarkerгенетическийВизуализация
boy illustration
vLLM: мощный инструмент для ускорения вывода ИИ
boy illustration
CodeGeeX: мощный инструмент генерации кода искусственного интеллекта, который можно использовать бесплатно в дополнение к второму пилоту.
boy illustration
Машинное обучение Реальный бой LightGBM + настройка параметров случайного поиска: точность 96,67%
boy illustration
Бесшовная интеграция, мгновенный интеллект [1]: платформа больших моделей Dify-LLM, интеграция без кодирования и встраивание в сторонние системы, более 42 тысяч звезд, чтобы стать свидетелями эксклюзивных интеллектуальных решений.
boy illustration
LM Studio для создания локальных больших моделей
boy illustration
Как определить количество слоев и нейронов скрытых слоев нейронной сети?
boy illustration
[Отслеживание целей] Подробное объяснение ByteTrack и детали кода