[Серия полюсов] Flink интегрирует KafkaSink & Выходные данные в реальном времени (11)
[Серия полюсов] Flink интегрирует KafkaSink & Выходные данные в реальном времени (11)

01 Введение

Язык кода:javascript
копировать
KafkaSink Поток данных может быть записан в один или несколько Kafka topic
Фактический адрес исходного кода, доступна загрузка в один клик: https://gitee.com/shawsongyue/aurora.git
Модуль: aurora_flink_connector_kafka
Пример: KafkaSinkStreamingJob

02 Зависимость разъема

2.1 Зависимости коннектора Kafka

Язык кода:javascript
копировать
        <!--kafkaполагаться start-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.0.2-1.18</version>
        </dependency>
        <!--kafkaполагаться end-->

2.2 базовые базовые зависимости

Язык кода:javascript
копировать
     Если эта зависимость не введена, сообщение об ошибке будет сообщено непосредственно при запуске проекта: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitter
Язык кода:javascript
копировать
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.18.0</version>
        </dependency>

03 Как использовать

Kafka sink Классы сборки предназначены для создания KafkaSink Пример

Язык кода:javascript
копировать
DataStream<String> stream = ...;
        
KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
        
stream.sinkTo(sink);

Следующие объекты недвижимости существуют KafkaSink Необходимо указать, когда:
Bootstrap servers,setBootstrapServers(String)
информациясериализатор(Serializer), setRecordSerializer(KafkaRecordSerializationSchema)
При использовании DeliveryGuarantee.EXACTLY_ONCE из семантической гарантии вам нужно использовать setTransactionalIdPrefix(String)

04 сериализатор

  1. Требуется при строительстве KafkaRecordSerializationSchema для преобразования входных данных в Kafka из ProducerRecord。Flink предоставил schema строитель Чтобы предоставить некоторые общие компоненты, такие как сериализация ключа (ключа)/тела (значения) сообщения, раздел Выбором и разделением сообщений также можно управлять путем реализации соответствующих интерфейсов для более широкого управления.
  2. Среди них метод сериализации тела (значения) сообщения и topic В методе выбора необходимо указать из. Кроме того, вы также можете пройти. setKafkaKeySerializer(Serializer) или setKafkaValueSerializer(Serializer) использовать Kafka предоставить вместо Flink предоставлено сериализатором
Язык кода:javascript
копировать
KafkaRecordSerializationSchema.builder()
    .setTopicSelector((element) -> {<your-topic-selection-logic>})
    .setValueSerializationSchema(new SimpleStringSchema())
    .setKeySerializationSchema(new SimpleStringSchema())
    .setPartitioner(new FlinkFixedPartitioner())
    .build();

05 Восстановление отказоустойчивости

Язык кода:javascript
копировать
`KafkaSink` Всего поддерживаются три различных семантических гарантии («DeliveryGuarantee»). для `DeliveryGuarantee.AT_LEAST_ONCE` и `DeliveryGuarantee.EXACTLY_ONCE`,Flink checkpoint Должно быть включено. По умолчанию `KafkaSink` использовать `DeliveryGuarantee.NONE`。 Ниже приводится объяснение различных семантических гарантий:
  • DeliveryGuarantee.NONE Никаких гарантий не предоставляется: сообщение может измениться из-за Kafka broker из Причины утраты или Причины Flink Неисправность возникает неоднократно.
  • DeliveryGuarantee.AT_LEAST_ONCE: sink существовать checkpoint буду ждать Kafka Все файлы в буфере Kafka producer подтверждать. новости не будет Kafka broker Он потерян из-за события в конце, но может существовать. Flink Повторяется при перезагрузке, потому что Flink Переобработаем старые данные.
  • DeliveryGuarantee.EXACTLY_ONCE: В этом режиме Кафка sink все будутданныепроходитьсуществовать checkpoint При фиксации транзакции пишется. Следовательно, если consumer Представленные изданные только для чтения (см. Kafka consumer Конфигурация isolation.level),существовать Flink дублирование данных не происходит при перезагрузке. Однако это сделало бы данныесуществовать. checkpoint Он не будет виден до завершения, поэтому внесите необходимые изменения. checkpoint из интервала. Пожалуйста, подтвердите транзакцию ID Префикс (transactionIdPrefix) уникален для разных приложений, чтобы обеспечить транзакции для разных заданий. Не будут влиять друг на друга! Кроме того, настоятельно рекомендуется Kafka Тайм-аут транзакции настроен намного больше, чем checkpoint максимальный интервал + Максимальное время перезапуска, в противном случае Kafka Истечение срока действия незафиксированных транзакций приведет к потере транзакций.

05 Мониторинг показателей

Kafka sink встречасуществоватьдругойизобъем(Scope)Сообщите о следующеминдекс。

объем

индекс

пользовательские переменные

описывать

тип

оператор

currentSendTime

n/a

Для отправки последних данных потребовалось время. Индекс отражает мгновенное значение последних данных.

Gauge

06 Практика исходного кода проекта

6.1 Структура пакета

6.2 Зависимость pom.xml

Язык кода:javascript
копировать
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink_connector_kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--Настройки недвижимости-->
    <properties>
        <!--java_JDKВерсия-->
        <java.version>11</java.version>
        <!--mavenПакетплагин-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--Скомпилированная кодировкаUTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--Кодировка выходного отчетаUTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--jsonданные Инструменты обработки форматов-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4jВерсия-->
        <log4j.version>2.17.1</log4j.version>
        <!--flinkВерсия-->
        <flink.version>1.18.0</flink.version>
        <!--scalaВерсия-->
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <!--Универсальныйполагаться-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================Интегрируйте внешниеполагаться==========================================-->
        <!--Интегрированная система ведения журналов start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--Интегрированная система ведения журналов end-->

        <!--kafkaполагаться start-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.0.2-1.18</version>
        </dependency>
        <!--kafkaполагаться end-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.18.0</version>
        </dependency>
    </dependencies>

    <!--компилировать Пакет-->
    <build>
        <finalName>${project.name}</finalName>
        <!--Файл ресурсов Пакет-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.aurora.KafkaStreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--плагин Единое управление-->
        <pluginManagement>
            <plugins>
                <!--mavenПакетплагин-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--компилировать Пакетплагин-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--КонфигурацияMavenТребуется в проектеиспользоватьиз Удаленный склад-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--используется для Конфигурацияmavenплагиниз Удаленный склад-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

6.3 Файл конфигурации

(1)application.properties

Язык кода:javascript
копировать
Адрес кластера #kafka
kafka.bootstrapServers=localhost:9092
#кафкатопик
kafka.topic=topic_a
#kafkaconsumergroup
kafka.group=aurora_group

(2)log4j2.properties

Язык кода:javascript
копировать
rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

6.4 Создание задания приемника

Язык кода:javascript
копировать
package com.aurora;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;

/**
 * @author Легкое лето из кота
 * @description kafka Коннектор использовать демонстрационное задание
 * @datetime 22:21 2024/2/1
 */
public class KafkaSinkStreamingJob {

    private static final Logger logger = LoggerFactory.getLogger(KafkaSinkStreamingJob.class);

    public static void main(String[] args) throws Exception {

        //==============1. Получить параметры========================== = =
        //Определяем путь к файлу
        String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink_connector_kafka\\src\\main\\resources\\application.properties";
        //Метод 1: напрямую использовать встроенный класс инструмента
        ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);

        //===============2. Инициализируем параметры Кафки========================. = ===
        String bootstrapServers = paramsMap.get("kafka.bootstrapServers");
        String topic = paramsMap.get("kafka.topic");

        KafkaSink<String> sink = KafkaSink.<String>builder()
                //Устанавливаем адрес Кафки
                .setBootstrapServers(bootstrapServers)
                //Устанавливаем режим порядкового номера сообщения
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                // хотя бы один раз
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();


        //=================4. Создайте рабочую среду Flink=================.
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ArrayList<String> listData = new ArrayList<>();
        listData.add("test");
        listData.add("java");
        listData.add("c++");
        DataStreamSource<String> dataStreamSource = env.fromCollection(listData);

        //==================5.данные Простая обработка=====================
        SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String record, Collector<String> collector) throws Exception {
                logger.info("толькосуществоватьиметь дело сkafkaданные:{}", record);
                collector.collect(record);
            }
        });

        //данныевыходоператор        flatMap.sinkTo(sink);

        //=================6. Запускаем сервис======================= === ==============
        //Включаем функцию flinkizcheckpoint: запускаем контрольную точку каждые 1000 мс (устанавливаем цикл операторов checkpointiz)
        env.enableCheckpointing(1000);
        // дополнительные настройки контрольной точки
        //Устанавливаем режим контрольной точки ровно один раз (это также значение по умолчанию)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // Убедитесь, что интервал между контрольными точками составляет не менее 500 мс (т. е. контрольная точка с минимальным интервалом)
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        //Убедимся, что проверка должна быть завершена в течение 1 минуты, иначе она будет сброшена (т.е. чекпоинт из таймаута)
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        //Одновременно разрешается эксплуатировать только одну контрольную точку
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //Даже после отмены программы контрольная точка сохранится, чтобы ее можно было восстановить до указанной контрольной точки в соответствии с фактическими потребностями.
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //настраиватьstatebackend,Укажите состояние и КПП и зданные места хранения (КПП и зданные должны иметь постоянное хранилище)
        env.getCheckpointConfig().setCheckpointStorage("file:///E:/flink/checkPoint");
        env.execute();
    }

}
boy illustration
Углубленный анализ переполнения памяти CUDA: OutOfMemoryError: CUDA не хватает памяти. Попыталась выделить 3,21 Ги Б (GPU 0; всего 8,00 Ги Б).
boy illustration
[Решено] ошибка установки conda. Среда решения: не удалось выполнить первоначальное зависание. Повторная попытка с помощью файла (графическое руководство).
boy illustration
Прочитайте нейросетевую модель Трансформера в одной статье
boy illustration
.ART Теплые зимние предложения уже открыты
boy illustration
Сравнительная таблица описания кодов ошибок Amap
boy illustration
Уведомление о последних правилах Points Mall в декабре 2022 года.
boy illustration
Даже новички могут быстро приступить к работе с легким сервером приложений.
boy illustration
Взгляд на RSAC 2024|Защита конфиденциальности в эпоху больших моделей
boy illustration
Вы используете ИИ каждый день и до сих пор не знаете, как ИИ дает обратную связь? Одна статья для понимания реализации в коде Python общих функций потерь генеративных моделей + анализ принципов расчета.
boy illustration
Используйте (внутренний) почтовый ящик для образовательных учреждений, чтобы использовать Microsoft Family Bucket (1T дискового пространства на одном диске и версию Office 365 для образовательных учреждений)
boy illustration
Руководство по началу работы с оперативным проектом (7) Практическое сочетание оперативного письма — оперативного письма на основе интеллектуальной системы вопросов и ответов службы поддержки клиентов
boy illustration
[docker] Версия сервера «Чтение 3» — создайте свою собственную программу чтения веб-текста
boy illustration
Обзор Cloud-init и этапы создания в рамках PVE
boy illustration
Корпоративные пользователи используют пакет регистрационных ресурсов для регистрации ICP для веб-сайта и активации оплаты WeChat H5 (с кодом платежного узла версии API V3)
boy illustration
Подробное объяснение таких показателей производительности с высоким уровнем параллелизма, как QPS, TPS, RT и пропускная способность.
boy illustration
Удачи в конкурсе Python Essay Challenge, станьте первым, кто испытает новую функцию сообщества [Запускать блоки кода онлайн] и выиграйте множество изысканных подарков!
boy illustration
[Техническая посадка травы] Кровавая рвота и отделка позволяют вам необычным образом ощипывать гусиные перья! Не распространяйте информацию! ! !
boy illustration
[Официальное ограниченное по времени мероприятие] Сейчас ноябрь, напишите и получите приз
boy illustration
Прочтите это в одной статье: Учебник для няни по созданию сервера Huanshou Parlu на базе CVM-сервера.
boy illustration
Cloud Native | Что такое CRD (настраиваемые определения ресурсов) в K8s?
boy illustration
Как использовать Cloudflare CDN для настройки узла (CF самостоятельно выбирает IP) Гонконг, Китай/Азия узел/сводка и рекомендации внутреннего высокоскоростного IP-сегмента
boy illustration
Дополнительные правила вознаграждения амбассадоров акции в марте 2023 г.
boy illustration
Можно ли открыть частный сервер Phantom Beast Palu одним щелчком мыши? Супер простой урок для начинающих! (Прилагается метод обновления сервера)
boy illustration
[Играйте с Phantom Beast Palu] Обновите игровой сервер Phantom Beast Pallu одним щелчком мыши
boy illustration
Maotouhu делится: последний доступный внутри страны адрес склада исходного образа Docker 2024 года (обновлено 1 декабря)
boy illustration
Кодирование Base64 в MultipartFile
boy illustration
5 точек расширения SpringBoot, супер практично!
boy illustration
Глубокое понимание сопоставления индексов Elasticsearch.
boy illustration
15 рекомендуемых платформ разработки с нулевым кодом корпоративного уровня. Всегда найдется та, которая вам понравится.
boy illustration
Аннотация EasyExcel позволяет экспортировать с сохранением двух десятичных знаков.