Apache Flink, как платформа распределенной потоковой обработки с открытым исходным кодом, получила широкое внимание и применение. В этой статье рассказывается, как создать операционную среду Flink с нуля и запустить на ней пример программы «WordCount».
Flink поддерживает развертывание на трех основных платформах: Linux, MacOS и Windows. В этой статье в качестве примера рассматривается среда Linux.
Необходимые зависимости программного обеспечения следующие:
# Установить JDK
yum install -y java-1.8.0-openjdk-devel
# Установить Мавен
yum install -y maven
Затем загрузите сжатый пакет Flink и распакуйте его:
wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz
tar -xvf flink-1.14.5-bin-scala_2.12.tgz
В автономном режиме и JobManager, и TaskManager работают на одном компьютере.
# Запустить диспетчер задач
./bin/start-cluster.sh
# Отправьте и запустите программу WordCount
./bin/flink run examples/streaming/WordCount.jar
В этой статье в качестве примера используется автономный режим. В реальных производственных средах рекомендуется развертывать и запускать в режиме кластера.
В режиме кластера JobManager и TaskManager будут развернуты на разных узлах.
Это позволяет Flink достигать высокой надежности и высокой производительности вычислений в распределенной среде.
WordCount — это потоковая программа WordCount, которая считывает текстовые источники и подсчитывает слова в единицах слов.
// Определить текстовый источник DataStream
DataStream<String> text = env.socketTextStream("localhost", 9999);
//Вырезаем каждую строку содержимого в список слов
DataStream<String> words = text
.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
String[] split = value.toLowerCase().split("\\W+");
// ...
}
});
//Считаем статистику по словам
DataStream<Tuple2<String, Long>> counts = words
.keyBy(value -> value)
.sum(1);
//Вывод результатов
counts.print();
Скомпилируйте и упакуйте проект и используйте FlinkClient для отправки задания:
mvn clean package
bin/flink run target/wordcount-1.0-SNAPSHOT.jar
Запустите программу, используйте инструмент netcat для отправки входной строки, и вы сможете увидеть статистические результаты в реальном времени:
nc localhost 9999
hello world bye
hello again
Полный пример кода обработчика потока WordCount представлен здесь:
// Импортировать пакеты, связанные с Flink
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class WordCount {
public static void main(String[] args) throws Exception {
// Создать среду выполнения
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Чтение источника данных текстовых строк из файла
DataStream<String> text = env.addSource(new MySourceFunction());
// Разделите каждую строку на слова
DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
String[] splits = value.split("\\s+");
for (String word : splits) {
out.collect(word);
}
}
});
// Подсчет групп по словам
DataStream<Tuple2<String, Long>> result = words.keyBy(e -> e)
.timeWindow(Time.seconds(5))
.sum(1);
// Распечатать окончательный результат
result.print();
// выполнять задания
env.execute("WordCount");
}
// Пользовательский источник текстовых данных
public static class MySourceFunction implements SourceFunction<String> {
@Override
public void run(SourceContext<String> ctx) throws Exception {
// Чтение текста из файла или коллекции
// ...
ctx.collect("hello world");
}
@Override
public void cancel() {
}
}
}
В этом примере считываются текстовые строки из файла, выполняется статистика частоты слов и выводятся результаты в виде объектного потока. Надеюсь, это может дать вам ссылку на полный пример кода!
Flink может использовать менеджер ресурсов Yarn для управления и планирования выполнения заданий Flink. Основные шаги заключаются в следующем:
Установите Hadoop и настройте менеджер ресурсов Yarn.
Измените файл конфигурации flink-conf.yaml и добавьте следующую конфигурацию:
yarn.distributed.enabled: true
mvn package -Pyarn
./bin/flink run -m yarn-cluster -yn 1 -ys 1 /path/to/job.jar
Параметр -m указывает использование Yarn в качестве менеджера ресурсов, а -yn -ys указывает количество контейнеров, назначенных задаче.
Статус задания Flink можно просматривать и отслеживать в веб-интерфейсе Yarn ResourceManager.
Вы также можете использовать Flink Cli для остановки и перезапуска заданий, выполняемых в Yarn.
В то же время Yarn также может автоматически увеличивать и уменьшать количество контейнеров в задании Flink в зависимости от нагрузки. Это обеспечивает хорошую интеграцию Flink и Yarn.
Выполнив описанные выше шаги, вы можете использовать возможности управления ресурсами Yarn для управления выполнением распределенных заданий Flink.
Flink поддерживает операции с временными окнами через API таблиц и SQL.
Проиллюстрируем примером:
Импортируйте TableEnvironment Flink:
TableEnvironment tableEnv = TableEnvironment.create(env);
Считайте данные из Kafka и зарегистрируйте их как таблицу:
tableEnv.connect(new FlinkKafkaConsumer<>(...)
.property(...));
Используйте DDL для определения структуры таблицы:
CREATE TABLE inputTable (
id STRING,
timestamp TIMESTAMP,
...)
WITH (...);
Используйте динамические временные окна TUMBLE или HOP.
SELECT
id,
COUNT(*)
FROM
inputTable
GROUP BY
TUMBLE(timestamp, INTERVAL '5' MINUTE)
Поддерживает оконные функции, такие как SUM, COUNT, MAX и другие агрегатные вычисления:
SELECT
SUM(amount)
FROM
inputTable
GROUP BY
HOP(timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)
Выведите результаты в Kafka или распечатайте:
tableEnv.toRetractStream[Row]...
Благодаря поддержке временных окон API таблиц и SQL потоки данных временных рядов могут управляться и обрабатываться более эффективно. Разработчики могут использовать знакомый синтаксис SQL для потоковой обработки.
Вот полный пример подсчета слов с использованием SQL:
// Создать среду выполнения
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(env);
// Чтение данных текстовой строки из Kafka
tableEnv.connect(new FlinkKafkaConsumer<>(...)
.topic("kafka_topic"))
.withFormat(new SimpleStringSchema())
.createTemporaryTable("lines");
// список сегментации слов
tableEnv.executeSql(
"CREATE TABLE words WITH ('connector' = 'upsert', 'url' = '...") AS " +
"SELECT " +
" ROW_NUMBER() OVER() AS id, " +
" word " +
"FROM lines, LATERAL(FLATTEN(SPLIT(lines,' ')))";
// агрегирование оконповерхность
tableEnv.executeSql(
"CREATE TABLE word_counts WITH ('connector' = 'upsert', 'url' = '...'") AS " +
"SELECT " +
" word, " +
" COUNT(*) AS count " +
"FROM words " +
"GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), word");
// Вывод результатов
tableEnv.executeSql("INSERT INTO sink SELECT * FROM word_counts");
// Выполнить программу
env.execute();
Этот полный пример включает в себя полное SQL-определение ввода данных, сегментацию слов, агрегацию окон и вывод результатов. Надеюсь, вам будет полезно понять процесс обработки потока реализации SQL.
Таким образом, принцип временного окна Flink заключается в следующем: распределять события по окнам в соответствии с временными метками, обновлять статус операций агрегации окон и выводить результаты при закрытии окна. Он не зависит от операторов и вводит концепцию времени для потоковой обработки.
Если половина данных, полученных из Kafka, находится в текущем временном окне, а половина — за его пределами, Flink обработает их следующим образом:
Таким образом, Flink может правильно различать данные внутри и вне временного окна:
Это гарантирует правильность времени и не вызовет ошибок в расчете результатов окна.