Тесный контакт с Flink: комплексная интерпретация вводного и практического руководства по структуре потоковых вычислений.
Тесный контакт с Flink: комплексная интерпретация вводного и практического руководства по структуре потоковых вычислений.

Предисловие

Apache Flink, как платформа распределенной потоковой обработки с открытым исходным кодом, получила широкое внимание и применение. В этой статье рассказывается, как создать операционную среду Flink с нуля и запустить на ней пример программы «WordCount».

Создание окружения Flink

1. Подготовка среды

Flink поддерживает развертывание на трех основных платформах: Linux, MacOS и Windows. В этой статье в качестве примера рассматривается среда Linux.

Необходимые зависимости программного обеспечения следующие:

  • JDK 8 или выше
  • Maven 3.5+
  • Флинк версия 1.14.5
Язык кода:javascript
копировать
# Установить JDK
yum install -y java-1.8.0-openjdk-devel

# Установить Мавен
yum install -y maven

Затем загрузите сжатый пакет Flink и распакуйте его:

Язык кода:javascript
копировать
wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz 
tar -xvf flink-1.14.5-bin-scala_2.12.tgz

2. Запустите Flink в автономном режиме.

В автономном режиме и JobManager, и TaskManager работают на одном компьютере.

Язык кода:javascript
копировать
# Запустить диспетчер задач
./bin/start-cluster.sh

# Отправьте и запустите программу WordCount
./bin/flink run examples/streaming/WordCount.jar

В этой статье в качестве примера используется автономный режим. В реальных производственных средах рекомендуется развертывать и запускать в режиме кластера.

3. Режим распределенного кластера

В режиме кластера JobManager и TaskManager будут развернуты на разных узлах.

  • Сначала запустите ResourceManager на машине
  • Запустите TaskManager на других рабочих узлах.
  • Отправьте задание в JobManager для планирования и выполнения.

Это позволяет Flink достигать высокой надежности и высокой производительности вычислений в распределенной среде.

4. Напишите программу WordCount.

WordCount — это потоковая программа WordCount, которая считывает текстовые источники и подсчитывает слова в единицах слов.

Язык кода:javascript
копировать
// Определить текстовый источник DataStream
DataStream<String> text = env.socketTextStream("localhost", 9999); 

//Вырезаем каждую строку содержимого в список слов
DataStream<String> words = text
  .flatMap(new FlatMapFunction<String, String>() {
     public void flatMap(String value, Collector<String> out) {
       String[] split = value.toLowerCase().split("\\W+");
       // ...
    }
  });
  
//Считаем статистику по словам        
DataStream<Tuple2<String, Long>> counts = words
  .keyBy(value -> value)
  .sum(1);
  
//Вывод результатов
counts.print();

5. Операция и результаты

Скомпилируйте и упакуйте проект и используйте FlinkClient для отправки задания:

Язык кода:javascript
копировать
mvn clean package

bin/flink run target/wordcount-1.0-SNAPSHOT.jar

Запустите программу, используйте инструмент netcat для отправки входной строки, и вы сможете увидеть статистические результаты в реальном времени:

Язык кода:javascript
копировать
nc localhost 9999
hello world bye
hello again

6.Примеры кода

Полный пример кода обработчика потока WordCount представлен здесь:

Язык кода:javascript
копировать
// Импортировать пакеты, связанные с Flink
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class WordCount {

  public static void main(String[] args) throws Exception {

    // Создать среду выполнения
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // Чтение источника данных текстовых строк из файла
    DataStream<String> text = env.addSource(new MySourceFunction());

    // Разделите каждую строку на слова
    DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
      public void flatMap(String value, Collector<String> out) {
        String[] splits = value.split("\\s+");
        for (String word : splits) {
          out.collect(word);
        }
      }
    });

    // Подсчет групп по словам
    DataStream<Tuple2<String, Long>> result = words.keyBy(e -> e)
      .timeWindow(Time.seconds(5)) 
      .sum(1);

    // Распечатать окончательный результат
    result.print();

    // выполнять задания
    env.execute("WordCount");

  }

  // Пользовательский источник текстовых данных
  public static class MySourceFunction implements SourceFunction<String> {

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
      // Чтение текста из файла или коллекции 
      // ...
      ctx.collect("hello world"); 
    }

    @Override
    public void cancel() {

    }
  }

}

В этом примере считываются текстовые строки из файла, выполняется статистика частоты слов и выводятся результаты в виде объектного потока. Надеюсь, это может дать вам ссылку на полный пример кода!

Flink интегрируется с Yarn

Flink может использовать менеджер ресурсов Yarn для управления и планирования выполнения заданий Flink. Основные шаги заключаются в следующем:

1. Установите и настройте Yarn

Установите Hadoop и настройте менеджер ресурсов Yarn.

2. Настройте Flink для поддержки Yarn.

Измените файл конфигурации flink-conf.yaml и добавьте следующую конфигурацию:

Язык кода:javascript
копировать
yarn.distributed.enabled: true

3. Упакуйте проект Flink как приложение Yarn.

Язык кода:javascript
копировать
mvn package -Pyarn 

4. Отправьте задание Flink в Yarn.

Язык кода:javascript
копировать
./bin/flink run -m yarn-cluster -yn 1 -ys 1 /path/to/job.jar

Параметр -m указывает использование Yarn в качестве менеджера ресурсов, а -yn -ys указывает количество контейнеров, назначенных задаче.

5. Задача мониторинга Yarn WebUI

Статус задания Flink можно просматривать и отслеживать в веб-интерфейсе Yarn ResourceManager.

6. Остановите и перезапустите задания.

Вы также можете использовать Flink Cli для остановки и перезапуска заданий, выполняемых в Yarn.

В то же время Yarn также может автоматически увеличивать и уменьшать количество контейнеров в задании Flink в зависимости от нагрузки. Это обеспечивает хорошую интеграцию Flink и Yarn.

Выполнив описанные выше шаги, вы можете использовать возможности управления ресурсами Yarn для управления выполнением распределенных заданий Flink.

Flink управляет sql через временное окно

Flink поддерживает операции с временными окнами через API таблиц и SQL.

Проиллюстрируем примером:

1. Определите источники данных

Импортируйте TableEnvironment Flink:

Язык кода:javascript
копировать
TableEnvironment tableEnv = TableEnvironment.create(env);

Считайте данные из Kafka и зарегистрируйте их как таблицу:

Язык кода:javascript
копировать
tableEnv.connect(new FlinkKafkaConsumer<>(...)
  .property(...));

2. Определить структуру таблицы

Используйте DDL для определения структуры таблицы:

Язык кода:javascript
копировать
CREATE TABLE inputTable (
  id STRING, 
  timestamp TIMESTAMP,
  ...)
WITH (...); 

3. Определите окно

Используйте динамические временные окна TUMBLE или HOP.

Язык кода:javascript
копировать
SELECT 
  id, 
  COUNT(*) 
FROM 
  inputTable
GROUP BY 
  TUMBLE(timestamp, INTERVAL '5' MINUTE)

4. Преобразование окон

Поддерживает оконные функции, такие как SUM, COUNT, MAX и другие агрегатные вычисления:

Язык кода:javascript
копировать
SELECT 
  SUM(amount) 
FROM 
  inputTable
GROUP BY 
  HOP(timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)

5. Вывод результатов

Выведите результаты в Kafka или распечатайте:

Язык кода:javascript
копировать
tableEnv.toRetractStream[Row]...

Благодаря поддержке временных окон API таблиц и SQL потоки данных временных рядов могут управляться и обрабатываться более эффективно. Разработчики могут использовать знакомый синтаксис SQL для потоковой обработки.

6. Пример кода задачи SQL

Вот полный пример подсчета слов с использованием SQL:

Язык кода:javascript
копировать
// Создать среду выполнения
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(env); 

// Чтение данных текстовой строки из Kafka
tableEnv.connect(new FlinkKafkaConsumer<>(...)
  .topic("kafka_topic"))
  .withFormat(new SimpleStringSchema())
  .createTemporaryTable("lines");

// список сегментации слов
tableEnv.executeSql(
  "CREATE TABLE words WITH ('connector' = 'upsert', 'url' = '...") AS " +
  "SELECT " +
  "  ROW_NUMBER() OVER() AS id, " +
  "  word " + 
  "FROM lines, LATERAL(FLATTEN(SPLIT(lines,' ')))";

// агрегирование оконповерхность   
tableEnv.executeSql(
  "CREATE TABLE word_counts WITH ('connector' = 'upsert', 'url' = '...'") AS " +
  "SELECT " +
  "  word, " +
  "  COUNT(*) AS count " +
  "FROM words " +
  "GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), word");

// Вывод результатов
tableEnv.executeSql("INSERT INTO sink SELECT * FROM word_counts");

// Выполнить программу
env.execute();

Этот полный пример включает в себя полное SQL-определение ввода данных, сегментацию слов, агрегацию окон и вывод результатов. Надеюсь, вам будет полезно понять процесс обработки потока реализации SQL.

Описание временного окна

1. Окно прокрутки

  • Существует два типа раздвижных окон: окна фиксированной длины (TUMBLE) и раздвижные окна (HOP).
  • Окна фиксированной длины фиксируют события в последовательных временных окнах фиксированного размера, и эти окна не перекрываются.
  • Скользящее окно перемещается с фиксированным интервалом времени, а перекрывающиеся части окон можно рассчитывать повторно.

2. Распределение окон

  • Каждое событие назначается соответствующей группе окон на основе временной метки.
  • Распределение окон реализовано с помощью оконной функции TIMESTAMP_WINDOW (timeField, размер окна).

3. Агрегация окон

  • После распределения событий над каждым окном выполняются операции агрегирования (такие как COUNT, SUM и т. д.).
  • Windows сохранит промежуточные результаты в серверной части состояния (например, RocksDB).

4. Окно вывода результатов

  • Когда окно закрывается (истекает срок действия), выводится окончательный результат.
  • Промежуточные результаты также могут выводиться заранее или периодически.

5. Управление статусами

  • Статус окна будет сохранен в виде снимка, чтобы обеспечить возможность возобновления и перезапуска после точек останова.
  • Состоянием управляет KeyedStateBackend, например RocksDB.

Таким образом, принцип временного окна Flink заключается в следующем: распределять события по окнам в соответствии с временными метками, обновлять статус операций агрегации окон и выводить результаты при закрытии окна. Он не зависит от операторов и вводит концепцию времени для потоковой обработки.

6. Та же логика пакетной обработки временного окна.

Если половина данных, полученных из Kafka, находится в текущем временном окне, а половина — за его пределами, Flink обработает их следующим образом:

  • Сначала данные распределяются по соответствующей группе разделов временного окна (состояние ключа) в соответствии с меткой времени события.
  • Обрабатывайте каждую группу разделов временного окна отдельно:
    • Данные во временном окне агрегируются и рассчитываются в соответствии с обычным процессом.
    • Данные за пределами временного окна не будут участвовать в агрегации текущего окна, но будут добавлены в противодавление клавиши.
  • Когда выводятся результаты окна:
    • Выводятся только результаты группы разделов, текущее окно которой было закрыто. Другие группы разделов включены и не будут выводиться.
  • Периодически проверяйте состояние окна:
    • Закройте те окна истечения срока действия, которые находятся за пределами временного диапазона.
    • Продолжайте накапливать статус для окон, срок действия которых еще не истек, и выводить результаты после истечения срока действия.

Таким образом, Flink может правильно различать данные внутри и вне временного окна:

  • Данные в окне участвуют в расчете текущего окна.
  • Добавьте обратное давление к данным за пределами окна и обработайте их в будущем окне.
  • Выводить результаты только для фактического окна истечения срока действия

Это гарантирует правильность времени и не вызовет ошибок в расчете результатов окна.

boy illustration
Учебное пособие по Jetpack Compose для начинающих, базовые элементы управления и макет
boy illustration
Код js веб-страницы, фон частицы, код спецэффектов
boy illustration
【новый! Суперподробное】Полное руководство по свойствам компонентов Figma.
boy illustration
🎉Обязательно к прочтению новичкам: полное руководство по написанию мини-программ WeChat с использованием программного обеспечения Cursor.
boy illustration
[Забавный проект Docker] VoceChat — еще одно приложение для мгновенного чата (IM)! Может быть встроен в любую веб-страницу!
boy illustration
Как реализовать переход по странице в HTML (html переходит на указанную страницу)
boy illustration
Как решить проблему зависания и низкой скорости при установке зависимостей с помощью npm. Существуют ли доступные источники npm, которые могут решить эту проблему?
boy illustration
Серия From Zero to Fun: Uni-App WeChat Payment Practice WeChat авторизует вход в систему и украшает страницу заказа, создает интерфейс заказа и инициирует запрос заказа
boy illustration
Серия uni-app: uni.navigateЧтобы передать скачок значения
boy illustration
Апплет WeChat настраивает верхнюю панель навигации и адаптируется к различным моделям.
boy illustration
JS-время конвертации
boy illustration
Обеспечьте бесперебойную работу ChromeDriver 125: советы по решению проблемы chromedriver.exe не найдены
boy illustration
Поле комментария, щелчок мышью, специальные эффекты, js-код
boy illustration
Объект массива перемещения объекта JS
boy illustration
Как открыть разрешение на позиционирование апплета WeChat_Как использовать WeChat для определения местонахождения друзей
boy illustration
Я даю вам два набора из 18 простых в использовании фонов холста Power BI, так что вам больше не придется возиться с цветами!
boy illustration
Получить текущее время в js_Как динамически отображать дату и время в js
boy illustration
Вам необходимо изучить сочетания клавиш vsCode для форматирования и организации кода, чтобы вам больше не приходилось настраивать формат вручную.
boy illustration
У ChatGPT большое обновление. Всего за 45 минут пресс-конференция показывает, что OpenAI сделал еще один шаг вперед.
boy illustration
Copilot облачной разработки — упрощение разработки
boy illustration
Микросборка xChatGPT с низким кодом, создание апплета чат-бота с искусственным интеллектом за пять шагов
boy illustration
CUDA Out of Memory: идеальное решение проблемы нехватки памяти CUDA
boy illustration
Анализ кластеризации отдельных ячеек, который должен освоить каждый&MarkerгенетическийВизуализация
boy illustration
vLLM: мощный инструмент для ускорения вывода ИИ
boy illustration
CodeGeeX: мощный инструмент генерации кода искусственного интеллекта, который можно использовать бесплатно в дополнение к второму пилоту.
boy illustration
Машинное обучение Реальный бой LightGBM + настройка параметров случайного поиска: точность 96,67%
boy illustration
Бесшовная интеграция, мгновенный интеллект [1]: платформа больших моделей Dify-LLM, интеграция без кодирования и встраивание в сторонние системы, более 42 тысяч звезд, чтобы стать свидетелями эксклюзивных интеллектуальных решений.
boy illustration
LM Studio для создания локальных больших моделей
boy illustration
Как определить количество слоев и нейронов скрытых слоев нейронной сети?
boy illustration
[Отслеживание целей] Подробное объяснение ByteTrack и детали кода