Apache Flink — это распределенный механизм обработки больших данных, который может выполнять вычисления с сохранением или без сохранения состояния для ограниченных и неограниченных потоков данных. Его можно развертывать в различных кластерных средах и выполнять быстрые вычисления с данными различных размеров. Поскольку мы обрабатываем потоковые данные, нам приходится сталкиваться с проблемами, вызванными несогласованностью восходящей и нисходящей передачи данных, а также скоростью обработки данных во время расчета потока данных. В этой статье передача данных в Flink впервые рассматривается с точки зрения «модели производитель-потребитель», тем самым вводя концепцию «противодавления». Затем рассказывается, как Flink реализует управление сетевым потоком до «Механизма обратного давления на основе TCP» до версии 1.5 и «Механизма обратного давления на основе кредитов» после версии 1.5. Наконец, анализируется случай противодавления, рассказывается о том, как позиционировать противодавление и оптимизацию ресурсов, а также отображаются результаты настройки. Я надеюсь, что после прочтения этой статьи читатели смогут получить более глубокое представление о концепции противодавления узла Flink и лежащих в его основе принципах. Столкнувшись со сценариями противодавления, они смогут быстро обнаружить узкие места и получить набор основных идей по настройке.
Когда задание Flink выполняется,Данные будут передаваться и обмениваться между различными индивидуальными TaskManager(TM).,Upstream TMприезжать Downstream TMиз передачи данных,можно рассматривать просто как производителя&потребительская модель。 будет представлен ниже Producer и Consumer по пропускной способностидругойчас,приводит к общим проблемам。
гипотеза Производительность производителя 2 МБ/с, потребитель 1 MB/s , скорость, с которой восходящий поток генерирует данные в это время больше, чем Данные нисходящей обработки выполняются быстрее, на обоих концах имеются буферы для временного хранения данных, а скорость передачи базовой сети ниже. 2 MB/s。 Если буфер ограничен, через 5 с Потребитель приличный Receive Buffer будут заполнены, а вновь поступившие данные можно будет только отбросить, но в реальных сценариях производители обычно проверяют перед отправкой данных; buffer доступный статус, если buffer Если он находится в недоступном состоянии, новые данные отправляться не будут. Столкнувшись с вышеперечисленными проблемами, нам необходимо иметь динамическую обратная Механизм связи динамически регулирует скорость отправки и приема данных в соответствии с ситуацией передачи данных в реальном времени, чтобы лучше выполнять передачу по сети. Динамическую обратную связь можно разделить на следующие два типа:
Введение в предыдущий раздел потоков,Мы понимаем приезжать,Когда данные о добыче и потреблении в нисходящем направлении несовместимы.,вызовет некоторые проблемы,В настоящее время необходим механизм «динамической обратной связи».,Далее вводится понятие «противодавление».
«Противодавление» — об Обработке в стриминговой системе. данныхспособностьиздинамическая обратная Механизм связи, представляющий собой обратную связь от нижестоящего узла, обычно происходит в режиме реального времени. Обработка данныеиз процесса, восходящий участок точкииз скорости производства больше, Чем ниже по течению точкаиз скорости потребления из ситуации.
будет представлен Ниже Во Флинке, как передавать данные между Таск Менеджерами, посмотрите Flink Конкретная форма шаблона производитель-потребитель при передаче данных.
Следующий рисунок взят из Apache Flink (http://flink.apache.org). Понятия, связанные с этим рисунком: ResultPartition (RP) ResultSubPartition (RS) InputChannel (IC) InputGate (IG)
MapDriver передает данные RecordWriter, затем распределяет данные через ChannelSelector и передает данные одному или нескольким RecordSerializers для сериализации и преобразования в двоичный поток. Для ChannelSelector существует два режима распространения: один — широковещательный, при котором данные отправляются каждому сериализатору для обработки. Другой — выбор в соответствии с некоторой логикой, например вычислением хэша данных и последующей маршрутизацией к попаданию. последовательность.
/**
* The {@link ChannelSelector} determines to which logical channels a record should be written to.
*
* @param <T> the type of record which is sent through the attached output gate
*/
public interface ChannelSelector<T extends IOReadableWritable> {
/**
* Initializes the channel selector with the number of output channels.
*
* @param numberOfChannels the total number of output channels which are attached to respective
* output gate.
*/
void setup(int numberOfChannels);
/**
* Returns the logical channel index, to which the given record should be written. It is illegal
* to call this method for broadcast channel selectors and this method can remain not
* implemented in that case (for example by throwing {@link UnsupportedOperationException}).
*
* Выберите режим
*
* @param record the record to determine the output channels for.
* @return an integer number which indicates the index of the output channel through which the
* record shall be forwarded.
*/
int selectChannel(T record);
/**
* Returns whether the channel selector always selects all the output channels.
* режим трансляции
*
* @return true if the selector is for broadcast mode.
*/
boolean isBroadcast();
}
public abstract class ResultPartition implements ResultPartitionWriter {
protected final ResultPartitionID partitionId;
/** Тип раздела определяется для реализации с использованием определенных подразделов. */
protected final ResultPartitionType partitionType;
protected final ResultPartitionManager partitionManager;
/** Subpartition изиндивидуальный数 */
protected final int numSubpartitions;
// - Runtime state --------------------------------------------------------
/** ResultPartition средняя буферная зона */
protected BufferPool bufferPool;
}
/**
* An input channel consumes a single {@link ResultSubpartitionView}.
*
* <p>For each channel, the consumption life cycle is as follows:
*
* <ol>
* <li>{@link #requestSubpartition()}
* <li>{@link #getNextBuffer()}
* <li>{@link #releaseAllResources()}
* </ol>
*/
public abstract class InputChannel {
/** Введите информацию о канале, чтобы идентифицировать его глобально в задаче. */
protected final InputChannelInfo channelInfo;
/** Этот канал потребляет полученный номер изRP. */
protected final ResultPartitionID partitionId;
/** Этот канал использует подразделы и индексы. */
protected final int consumedSubpartitionIndex;
protected final SingleInputGate inputGate;
}
По сути, это типичная модель производитель-потребитель, куда поступают данные о добыче. ResultPartition(Зависит отResultSubpartitionсоставляют) Средний и нижний перевал InputGate (Зависит отInputChannelсоставляют)данные о потреблении。другойиз task возможно, в том же самом TaskManager При запуске эти задачи можно рассматривать как одно и то же. Различные потоки в процессе TaskManager могут обмениваться данными локально по-разному; task Это может быть и в другом TaskManger При запуске необходимо передать TaskManager обмен данными посредством сетевой связи.
представленный ранее Flink на основемодель Производитель-потребитель метода передачи данных, и мы понимаем, что при обработке данных потоковой системой, если скорости обработки восходящих и нисходящих потоков несовместимы, возникнут такие проблемы, как перегрузка данных. На этот раз вам нужна динамическая обратная Механизм связи динамически регулирует скорость отправки и приема данных в соответствии с ситуацией передачи данных в реальном времени, чтобы лучше выполнять передачу по сети, то есть «управление сетевым потоком». В этой главе будут представлены Flink Существует два способа управления сетевым потоком до и после версии 1.5:
Давайте сначала посмотрим, как Flink реализовывал динамическую обратную связь до версии 1.5 для достижения управления сетевым потоком. на основеTCPмеханизм изпротиводавления Нижний слой зависит от「TCPиз алгоритм скользящего окна」,В этой главе не будут вдаваться в подробности,И заново опишу феномен противодавления процесса передачи.
Как видно из рисунка выше, каждый TaskManager будет иметь один общий для всех внутренних задач. Network Buffer Пул, который применяется к ресурсам памяти из памяти вне кучи, может затем предоставить каждый ResultSubpartition создавать Local Buffer Pool。 гипотеза производителя из ставки 2 МБ/с, потребительский тариф 1 МБ/С. Ниже будет описан процесс, в котором буферы на каждом уровне заполняются из-за несоответствия скорости, вызывая противодавление.
Через некоторое время будет достигнуто состояние, показанное ниже. В это время входной канал временно заполнен, и к пулу локальных буферов необходимо применить новый буфер. с пометкой «Используется».
Поскольку скорости обработки восходящего и нисходящего потоков не совпадают, через некоторое время входной канал завершил обращение к памяти пула локальных буферов. В это время все буферы в пуле локальных буферов помечаются как используемые, но вы можете продолжить. для подачи заявки на буферы из пула сетевых буферов.
Постепенно в пуле сетевых буферов не осталось доступных буферов, и все они стали использоваться. В это время потребители больше не могут читать данные, а Netty больше не будет получать данные сокетов.
Когда сокет потребителя будет исчерпан, отправителю производителя будет отправлено значение windows=0 (скользящее окно TCP), и сокет прекратит отправку данных.
Вскоре буфер сокета будет исчерпан. Netty прекратит отправлять данные в сокет после его обнаружения. Позже, поскольку RecordWriter все еще отправляет данные, данные будут накапливаться в буфере Netty. После определенного уровня Netty станет недоступным для записи, а ResultSubpartition станет недоступным. отправлять данные. Доступна ли запись в Netty. В это время ResultSubpartition прекратит запись данных в Netty.
ResultSubpartition пространство быстро используется до тех пор, пока Local Buffer Pool и Network Buffer Pool из После заполнения буфера RecordWriter Воля Перестаньте записывать данные,До сих пор,Выполнено через TaskManagerпротиводавление.
когда Task из После исчерпания буферного пула сетевое соединение блокируется, и восходящий поток Task Невозможно генерировать данные, ниже по течению Task Невозможно получить данные, что мы называем статусом «противодавление». Но на основеTCPмеханизм изпротиводавления Есть следующие вопросы:
Чтобы решить вышеуказанные проблемы, Flink 1.5 Реконструировал сетевой стек и ввёл "на на основе Кредитный алгоритм управления потоками» (Credit-based Flow Control), то есть управление сетевым потоком реализуется на уровне Flink, сокращая линию противодавления и обеспечивая TaskManager Сетевое соединение между из никогда не будет заблокировано. Credit-based Flow Control Идея на самом деле очень проста: между принимающей стороной и отправляющей стороной устанавливается механизм «кредитного рейтинга». Данные, отправляемые отправляющей стороной принимающей стороне, никогда не превышают кредитную стоимость принимающей стороны. для Для Flink кредитным значением является принимающий TaskManager. доступный Buffer из количества, чтобы гарантировать окончание отправки TaskManager не будет TCP Данные, отправленные во время соединения, превышают доступную емкость приемного буфера. На основе кредита Реализация управления потоком из Специальный механизмдля:
На основе кредитаалгоритммеханизм изпротиводавления,Решены две индивидуальные проблемы:
В этой статье впервые описывается передача данных между TaskManager в Flink, что приводит к «модели производитель-потребитель」по пропускной способностидругойчас,приводит к общим проблемам,а также「динамическая обратная связь」механизмизнеобходимость,и дал понять「противодавление」изконцепция,「противодавление」是流式系统中关于处理способностьиздинамическая обратная механизм связи, и это обратная связь от низшего звена приехать вверх по течению из. 接着介绍ПонятноFlinkиз网络流控механизм,Флинк до версии 1.5,「на основе механизма раздвижного окна TCPиз」выполнитьпротиводавление,Однако существование одной задачи противодавления приведет к тому, что весь индивидуальный TaskManager, общий из Socket, будет недоступен.,И звено противодавления длиннее,Механизм динамической обратной связи медленнее, чем для, и в нем отсутствует точка. Флинк после V1.5,использовать「На основе кредитаалгоритммеханизм изпротиводавления」,Реализуйте противодавление на уровне ResultPartition.,улучшать Понятнопротиводавлениеэффективность。
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks https://www.ververica.com/blog/how-flink-handles-backpressure https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn