Принцип противодавления Flink, объясненный простыми словами и идеи решения
Принцип противодавления Flink, объясненный простыми словами и идеи решения

1. Предисловие

Apache Flink — это распределенный механизм обработки больших данных, который может выполнять вычисления с сохранением или без сохранения состояния для ограниченных и неограниченных потоков данных. Его можно развертывать в различных кластерных средах и выполнять быстрые вычисления с данными различных размеров. Поскольку мы обрабатываем потоковые данные, нам приходится сталкиваться с проблемами, вызванными несогласованностью восходящей и нисходящей передачи данных, а также скоростью обработки данных во время расчета потока данных. В этой статье передача данных в Flink впервые рассматривается с точки зрения «модели производитель-потребитель», тем самым вводя концепцию «противодавления». Затем рассказывается, как Flink реализует управление сетевым потоком до «Механизма обратного давления на основе TCP» до версии 1.5 и «Механизма обратного давления на основе кредитов» после версии 1.5. Наконец, анализируется случай противодавления, рассказывается о том, как позиционировать противодавление и оптимизацию ресурсов, а также отображаются результаты настройки. Я надеюсь, что после прочтения этой статьи читатели смогут получить более глубокое представление о концепции противодавления узла Flink и лежащих в его основе принципах. Столкнувшись со сценариями противодавления, они смогут быстро обнаружить узкие места и получить набор основных идей по настройке.

2. Анализ «обратного давления» при передаче данных Flink

2.1 Модель производитель-потребитель

Когда задание Flink выполняется,Данные будут передаваться и обмениваться между различными индивидуальными TaskManager(TM).,Upstream TMприезжать Downstream TMиз передачи данных,можно рассматривать просто как производителя&потребительская модель。 будет представлен ниже Producer и Consumer по пропускной способностидругойчас,приводит к общим проблемам。

гипотеза Производительность производителя 2 МБ/с, потребитель 1 MB/s , скорость, с которой восходящий поток генерирует данные в это время больше, чем Данные нисходящей обработки выполняются быстрее, на обоих концах имеются буферы для временного хранения данных, а скорость передачи базовой сети ниже. 2 MB/s。 Если буфер ограничен, через 5 с Потребитель приличный Receive Buffer будут заполнены, а вновь поступившие данные можно будет только отбросить, но в реальных сценариях производители обычно проверяют перед отправкой данных; buffer доступный статус, если buffer Если он находится в недоступном состоянии, новые данные отправляться не будут. Столкнувшись с вышеперечисленными проблемами, нам необходимо иметь динамическую обратная Механизм связи динамически регулирует скорость отправки и приема данных в соответствии с ситуацией передачи данных в реальном времени, чтобы лучше выполнять передачу по сети. Динамическую обратную связь можно разделить на следующие два типа:

  • Положительный отзыв: когда Производитель присылает оценку меньше, чем Потребитель должен быть уведомлен при принятии тарифа ProducerМожетулучшатьскорость отправки
  • Отрицательный отзыв: Когда производитель присылает оценку больше, чем Потребитель должен быть уведомлен при принятии тарифа ProducerМожетуменьшатьскорость отправки

2.2 Что такое «противодавление»

Введение в предыдущий раздел потоков,Мы понимаем приезжать,Когда данные о добыче и потреблении в нисходящем направлении несовместимы.,вызовет некоторые проблемы,В настоящее время необходим механизм «динамической обратной связи».,Далее вводится понятие «противодавление».

«Противодавление» — об Обработке в стриминговой системе. данныхспособностьиздинамическая обратная Механизм связи, представляющий собой обратную связь от нижестоящего узла, обычно происходит в режиме реального времени. Обработка данныеиз процесса, восходящий участок точкииз скорости производства больше, Чем ниже по течению точкаиз скорости потребления из ситуации.

будет представлен Ниже Во Флинке, как передавать данные между Таск Менеджерами, посмотрите Flink Конкретная форма шаблона производитель-потребитель при передаче данных.

2.3 Передача данных между диспетчерами задач

Следующий рисунок взят из Apache Flink (http://flink.apache.org). Понятия, связанные с этим рисунком: ResultPartition (RP) ResultSubPartition (RS) InputChannel (IC) InputGate (IG)

MapDriver передает данные RecordWriter, затем распределяет данные через ChannelSelector и передает данные одному или нескольким RecordSerializers для сериализации и преобразования в двоичный поток. Для ChannelSelector существует два режима распространения: один — широковещательный, при котором данные отправляются каждому сериализатору для обработки. Другой — выбор в соответствии с некоторой логикой, например вычислением хэша данных и последующей маршрутизацией к попаданию. последовательность.

Язык кода:javascript
копировать
/**
 * The {@link ChannelSelector} determines to which logical channels a record should be written to.
 *
 * @param <T> the type of record which is sent through the attached output gate
 */
public interface ChannelSelector<T extends IOReadableWritable> {

    /**
     * Initializes the channel selector with the number of output channels.
     *
     * @param numberOfChannels the total number of output channels which are attached to respective
     *     output gate.
     */
    void setup(int numberOfChannels);

    /**
     * Returns the logical channel index, to which the given record should be written. It is illegal
     * to call this method for broadcast channel selectors and this method can remain not
     * implemented in that case (for example by throwing {@link UnsupportedOperationException}).
     * 
     * Выберите режим
     *
     * @param record the record to determine the output channels for.
     * @return an integer number which indicates the index of the output channel through which the
     *     record shall be forwarded.
     */
    int selectChannel(T record);

    /**
     * Returns whether the channel selector always selects all the output channels.
     * режим трансляции
     *
     * @return true if the selector is for broadcast mode.
     */
    boolean isBroadcast();
}
  1. Данные сериализованного выходного двоичного потока будут сохранены в буферном блоке, а затем BufferWriter Эти буферные блоки будут записаны в указанный ResultPartition (RP). RP также содержит несколько подразделов ResultSubpartitions (например, RS1, RS2). В каждом подразделе хранятся только данные, необходимые конкретным потребителям. Как видно на картинке, один buffer был BufferWriter Путин RS2 в, в это время RS2 Этот подраздел стал расходуемым, и о нем будет сообщено позже. JobManager。
Язык кода:javascript
копировать
public abstract class ResultPartition implements ResultPartitionWriter {

    protected final ResultPartitionID partitionId;

    /** Тип раздела определяется для реализации с использованием определенных подразделов. */
    protected final ResultPartitionType partitionType;

    protected final ResultPartitionManager partitionManager;

    /** Subpartition изиндивидуальный数 */
    protected final int numSubpartitions;

    // - Runtime state --------------------------------------------------------

    /** ResultPartition средняя буферная зона */
    protected BufferPool bufferPool;
    
}
  1. JobManager буду искать RS2 изConsumer уведомляет TaskManager2 о том, что блок данных можно использовать. Затем InputChannel получит сообщение от «приехать» (изображение IC1, используемый для получения RS2 中из буфер и InputChannel и ResultSubpartition Это 1-1, что соответствует из, один InputChannel получить ResultSubpartition из вывода) и уведомить RS2 Инициализируйте сетевое соединение и начните передачу данных. Затем RS2 проходить TaskManager1 из сетевого стека основе Netty Для передачи данных сетевое соединение устанавливается между каждым диспетчером задач. Между длительным существованием из.
Язык кода:javascript
копировать
/**
 * An input channel consumes a single {@link ResultSubpartitionView}.
 *
 * <p>For each channel, the consumption life cycle is as follows:
 *
 * <ol>
 *   <li>{@link #requestSubpartition()}
 *   <li>{@link #getNextBuffer()}
 *   <li>{@link #releaseAllResources()}
 * </ol>
 */
public abstract class InputChannel {
    /** Введите информацию о канале, чтобы идентифицировать его глобально в задаче. */
    protected final InputChannelInfo channelInfo;

    /** Этот канал потребляет полученный номер изRP. */
    protected final ResultPartitionID partitionId;

    /** Этот канал использует подразделы и индексы. */
    protected final int consumedSubpartitionIndex;

    protected final SingleInputGate inputGate;
}
  1. на основе Netty изсетевой передачи, передаются буферные блокиприезжать TaskManager2 сетевой стек, за которым следует ConnectionManager контролировать Буфер передачи данных в указанном месте InputChannel и входит в InputGate и, наконец, входит в десериализатор (RecordDeserializer) Данные в Волябуфере восстанавливаются до указанного типа объекта и, наконец, передаются в задачу приема данных.

По сути, это типичная модель производитель-потребитель, куда поступают данные о добыче. ResultPartition(Зависит отResultSubpartitionсоставляют) Средний и нижний перевал InputGate (Зависит отInputChannelсоставляют)данные о потреблении。другойиз task возможно, в том же самом TaskManager При запуске эти задачи можно рассматривать как одно и то же. Различные потоки в процессе TaskManager могут обмениваться данными локально по-разному; task Это может быть и в другом TaskManger При запуске необходимо передать TaskManager обмен данными посредством сетевой связи.

3. Управление потоком сети Flink

представленный ранее Flink на основемодель Производитель-потребитель метода передачи данных, и мы понимаем, что при обработке данных потоковой системой, если скорости обработки восходящих и нисходящих потоков несовместимы, возникнут такие проблемы, как перегрузка данных. На этот раз вам нужна динамическая обратная Механизм связи динамически регулирует скорость отправки и приема данных в соответствии с ситуацией передачи данных в реальном времени, чтобы лучше выполнять передачу по сети, то есть «управление сетевым потоком». В этой главе будут представлены Flink Существует два способа управления сетевым потоком до и после версии 1.5:

  • на основе TCP механизм изпротиводавления
  • на основе Credit механизм изпротиводавления

3.1 Механизм противодавления на основе TCP

Давайте сначала посмотрим, как Flink реализовывал динамическую обратную связь до версии 1.5 для достижения управления сетевым потоком. на основеTCPмеханизм изпротиводавления Нижний слой зависит от「TCPиз алгоритм скользящего окна」,В этой главе не будут вдаваться в подробности,И заново опишу феномен противодавления процесса передачи.

Как видно из рисунка выше, каждый TaskManager будет иметь один общий для всех внутренних задач. Network Buffer Пул, который применяется к ресурсам памяти из памяти вне кучи, может затем предоставить каждый ResultSubpartition создавать Local Buffer Pool。 гипотеза производителя из ставки 2 МБ/с, потребительский тариф 1 МБ/С. Ниже будет описан процесс, в котором буферы на каждом уровне заполняются из-за несоответствия скорости, вызывая противодавление.

3.1.1 Буфер входного канала заполнен

Через некоторое время будет достигнуто состояние, показанное ниже. В это время входной канал временно заполнен, и к пулу локальных буферов необходимо применить новый буфер. с пометкой «Используется».

3.1.2 Потребительский локальный буферный пул заполнен

Поскольку скорости обработки восходящего и нисходящего потоков не совпадают, через некоторое время входной канал завершил обращение к памяти пула локальных буферов. В это время все буферы в пуле локальных буферов помечаются как используемые, но вы можете продолжить. для подачи заявки на буферы из пула сетевых буферов.

3.1.3 Буферный пул потребительской сети заполнен

Постепенно в пуле сетевых буферов не осталось доступных буферов, и все они стали использоваться. В это время потребители больше не могут читать данные, а Netty больше не будет получать данные сокетов.

3.1.4 Сокет останавливает передачу данных

Когда сокет потребителя будет исчерпан, отправителю производителя будет отправлено значение windows=0 (скользящее окно TCP), и сокет прекратит отправку данных.

3.1.5 Netty не доступна для записи

Вскоре буфер сокета будет исчерпан. Netty прекратит отправлять данные в сокет после его обнаружения. Позже, поскольку RecordWriter все еще отправляет данные, данные будут накапливаться в буфере Netty. После определенного уровня Netty станет недоступным для записи, а ResultSubpartition станет недоступным. отправлять данные. Доступна ли запись в Netty. В это время ResultSubpartition прекратит запись данных в Netty.

3.1.5 RecordWriter прекращает запись данных

ResultSubpartition пространство быстро используется до тех пор, пока Local Buffer Pool и Network Buffer Pool из После заполнения буфера RecordWriter Воля Перестаньте записывать данные,До сих пор,Выполнено через TaskManagerпротиводавление.

3.1.6 Проблемы с механизмом противодавления TCP

когда Task из После исчерпания буферного пула сетевое соединение блокируется, и восходящий поток Task Невозможно генерировать данные, ниже по течению Task Невозможно получить данные, что мы называем статусом «противодавление». Но на основеTCPмеханизм изпротиводавления Есть следующие вопросы:

  • один TaskManager Обычно существует несколько индивидуальных задач, и они будут повторно использовать один и тот же сокет внизу. Task В связанном пуле буферов еще есть место, но его нельзя применить к TCP Запись данных в соединение или чтение данных из него.
  • на Основополагающий механизм управления потоком TCP, от ResultPartition приезжать Netty приезжать Все звено разъема длинное, из-за чего линия противодавления не будет достаточно чувствительной и динамической. обратная Процесс связи относительно медленный.

3.2 Механизм противодействия на основе кредитов

3.2.1 Знакомство с алгоритмом

Чтобы решить вышеуказанные проблемы, Flink 1.5 Реконструировал сетевой стек и ввёл "на на основе Кредитный алгоритм управления потоками» (Credit-based Flow Control), то есть управление сетевым потоком реализуется на уровне Flink, сокращая линию противодавления и обеспечивая TaskManager Сетевое соединение между из никогда не будет заблокировано. Credit-based Flow Control Идея на самом деле очень проста: между принимающей стороной и отправляющей стороной устанавливается механизм «кредитного рейтинга». Данные, отправляемые отправляющей стороной принимающей стороне, никогда не превышают кредитную стоимость принимающей стороны. для Для Flink кредитным значением является принимающий TaskManager. доступный Buffer из количества, чтобы гарантировать окончание отправки TaskManager не будет TCP Данные, отправленные во время соединения, превышают доступную емкость приемного буфера. На основе кредита Реализация управления потоком из Специальный механизмдля:

  • Когда отправитель отправляет buffer по времени он принимает текущие накопленные данные из buffer Количество (отставание размер) сообщает принимающей стороне;
  • Принимающая сторона подает заявку на буфер в соответствии с количеством стопок, накопленных на отправляющей стороне;
  • Принимающая сторона объявляет отправляющей сторонедоступный Credit(одиндоступный buffer Переписка один credit);
  • Когда принимающая сторона выделяет N точка Credit отправителю, указывая, что он имеет N индивидуальныйпраздныйиз buffer Может получать данные;
  • Когда отправитель получит N точка Кредит, указывающий, что его можно отправить в сеть. N индивидуальный buffer;
  • только в Credit > 0 Отправляющая сторона отправит его только при условии, что буфер каждый раз, когда отправитель отправляет один; buffer,Credit также соответственно уменьшилась;

3.2.2 Процесс противодавления

  1. Как показано на рисунке, в настоящее время ResultPartition Уже накопилось два индивидуальных Buffer изданных, поэтому передача в базовой сети будет Для передачи данных и бэклога size = 2 Отправлено на принимающую сторону; после того, как нижестоящая сторона получит сообщение о прибытии, общая сумма кредита будет рассчитана на принимающей стороне. 6 индивидуальныйbuffer, Скорость приема 1 индивидуальныйbuffer,backlog size для2индивидуальныйbuffer, credit для 3 индивидуальныйbuffer(6-1-2 = 3)。
  1. На картинке ниже,Отправитель отправляет размер очереди = 2. Однако буферы, полученные на всех уровнях, уже полны, поэтому нисходящий поток возвращается в восходящий. credit для0, указывающий, что из-за несогласованных скоростей обработки восходящего и нисходящего потоков нисходящий поток временно не может обрабатывать данные в это время; ResultPartition Сразуне будет Netty передавать данные,Данные могут быстро накапливаться;,Тем самым достигается эффект приезжатьпротиводавленияиз;

3.2.3 Точки оптимизации

На основе кредитаалгоритммеханизм изпротиводавления,Решены две индивидуальные проблемы:

  • Вы можете напрямую ResultPartition реализация слояпротиводавление,Вместо пропускания потока давления через несколько слоев,Слои обратной связи. улучшить эффективность противодавления,компактнее с задержкой;
  • Не поместит базовый сокет;,тем самым препятствуя передаче данных по сети,Не пущу одинокого человека Task изузкое местодлявсеиндивидуальныйTaskManager из узкого места;

4. Резюме

В этой статье впервые описывается передача данных между TaskManager в Flink, что приводит к «модели производитель-потребитель」по пропускной способностидругойчас,приводит к общим проблемам,а также「динамическая обратная связь」механизмизнеобходимость,и дал понять「противодавление」изконцепция,「противодавление」是流式系统中关于处理способностьиздинамическая обратная механизм связи, и это обратная связь от низшего звена приехать вверх по течению из. 接着介绍ПонятноFlinkиз网络流控механизм,Флинк до версии 1.5,「на основе механизма раздвижного окна TCPиз」выполнитьпротиводавление,Однако существование одной задачи противодавления приведет к тому, что весь индивидуальный TaskManager, общий из Socket, будет недоступен.,И звено противодавления длиннее,Механизм динамической обратной связи медленнее, чем для, и в нем отсутствует точка. Флинк после V1.5,использовать「На основе кредитаалгоритммеханизм изпротиводавления」,Реализуйте противодавление на уровне ResultPartition.,улучшать Понятнопротиводавлениеэффективность。

5. Ссылки

https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks https://www.ververica.com/blog/how-flink-handles-backpressure https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn

boy illustration
Углубленный анализ переполнения памяти CUDA: OutOfMemoryError: CUDA не хватает памяти. Попыталась выделить 3,21 Ги Б (GPU 0; всего 8,00 Ги Б).
boy illustration
[Решено] ошибка установки conda. Среда решения: не удалось выполнить первоначальное зависание. Повторная попытка с помощью файла (графическое руководство).
boy illustration
Прочитайте нейросетевую модель Трансформера в одной статье
boy illustration
.ART Теплые зимние предложения уже открыты
boy illustration
Сравнительная таблица описания кодов ошибок Amap
boy illustration
Уведомление о последних правилах Points Mall в декабре 2022 года.
boy illustration
Даже новички могут быстро приступить к работе с легким сервером приложений.
boy illustration
Взгляд на RSAC 2024|Защита конфиденциальности в эпоху больших моделей
boy illustration
Вы используете ИИ каждый день и до сих пор не знаете, как ИИ дает обратную связь? Одна статья для понимания реализации в коде Python общих функций потерь генеративных моделей + анализ принципов расчета.
boy illustration
Используйте (внутренний) почтовый ящик для образовательных учреждений, чтобы использовать Microsoft Family Bucket (1T дискового пространства на одном диске и версию Office 365 для образовательных учреждений)
boy illustration
Руководство по началу работы с оперативным проектом (7) Практическое сочетание оперативного письма — оперативного письма на основе интеллектуальной системы вопросов и ответов службы поддержки клиентов
boy illustration
[docker] Версия сервера «Чтение 3» — создайте свою собственную программу чтения веб-текста
boy illustration
Обзор Cloud-init и этапы создания в рамках PVE
boy illustration
Корпоративные пользователи используют пакет регистрационных ресурсов для регистрации ICP для веб-сайта и активации оплаты WeChat H5 (с кодом платежного узла версии API V3)
boy illustration
Подробное объяснение таких показателей производительности с высоким уровнем параллелизма, как QPS, TPS, RT и пропускная способность.
boy illustration
Удачи в конкурсе Python Essay Challenge, станьте первым, кто испытает новую функцию сообщества [Запускать блоки кода онлайн] и выиграйте множество изысканных подарков!
boy illustration
[Техническая посадка травы] Кровавая рвота и отделка позволяют вам необычным образом ощипывать гусиные перья! Не распространяйте информацию! ! !
boy illustration
[Официальное ограниченное по времени мероприятие] Сейчас ноябрь, напишите и получите приз
boy illustration
Прочтите это в одной статье: Учебник для няни по созданию сервера Huanshou Parlu на базе CVM-сервера.
boy illustration
Cloud Native | Что такое CRD (настраиваемые определения ресурсов) в K8s?
boy illustration
Как использовать Cloudflare CDN для настройки узла (CF самостоятельно выбирает IP) Гонконг, Китай/Азия узел/сводка и рекомендации внутреннего высокоскоростного IP-сегмента
boy illustration
Дополнительные правила вознаграждения амбассадоров акции в марте 2023 г.
boy illustration
Можно ли открыть частный сервер Phantom Beast Palu одним щелчком мыши? Супер простой урок для начинающих! (Прилагается метод обновления сервера)
boy illustration
[Играйте с Phantom Beast Palu] Обновите игровой сервер Phantom Beast Pallu одним щелчком мыши
boy illustration
Maotouhu делится: последний доступный внутри страны адрес склада исходного образа Docker 2024 года (обновлено 1 декабря)
boy illustration
Кодирование Base64 в MultipartFile
boy illustration
5 точек расширения SpringBoot, супер практично!
boy illustration
Глубокое понимание сопоставления индексов Elasticsearch.
boy illustration
15 рекомендуемых платформ разработки с нулевым кодом корпоративного уровня. Всегда найдется та, которая вам понравится.
boy illustration
Аннотация EasyExcel позволяет экспортировать с сохранением двух десятичных знаков.