[Tianyan Series 02] Глубокое понимание компонента Flink FileSink: сохранение потоковых данных в реальном времени и пакетная запись.
[Tianyan Series 02] Глубокое понимание компонента Flink FileSink: сохранение потоковых данных в реальном времени и пакетная запись.
Apache Flink — это мощная платформа потоковой обработки, а FileSink, как один из ее ключевых компонентов, отвечает за вывод результатов потоковой обработки в файлы. В этой статье будут подробно представлены все аспекты FileSink, включая основные концепции, детали реализации и примеры кода.

01 Основные понятия

FileSink — это функция Sink в Flink, которая используется для вывода обработанных данных в файловую систему. Он способен обрабатывать потоки данных в реальном времени и предоставляет гибкие возможности конфигурации, которые позволяют пользователям определять формат, путь и стратегию записи выходных файлов.


02 Принцип работы

FileSink — это функция Sink в Apache Flink, которая используется для вывода данных результатов потоковой обработки в файловую систему. Этот принцип включает в себя модель обработки потока данных Flink и операции файловой системы.

  1. Реализация интерфейса приемника:FileSink Осуществленный Flink в Sink интерфейс,чтобы его можно было добавить к задачам потоковой обработки,И получить поток данных для обработки.
  2. буферизация данных и пакетная обработка:FileSink Полученные данные будут буферизоваться до тех пор, пока буфер не достигнет определенного размера или определенного интервала времени, а затем данные будут записываться в файл пакетами. Этот механизм пакетной обработки может эффективно сократить количество файлов. системаизписатьнакладные расходы,улучшатьписатьэффективность。
  3. Группирование данных:дляулучшатьписать Параллелизм и редукцияфайловая Конфликт одновременной записи для системы, FileSink может разделить данные на несколько разных файловых сегментов для обработки. Каждый файловый сегмент соответствует выходному файлу.,Вданные потоки данных будут распределены по разным файловым сегментам в соответствии с определенными правилами.,Тогда соответственнописатьсоответствоватьизв файле。
  4. Запись конфигурации политики:FileSink Обеспечивает богатыйиз Конфигурация Параметры,Позволяет пользователям гибко настраивать стратегию письма в соответствии со своими потребностями. Пользователи могут устанавливать такие параметры, как размер пакета, путь и правила именования для записи файлов, метод DataFormat, алгоритм сжатия и т. д.,для удовлетворения потребностей различных сценариев.
  5. Сопровождение сделок:чтобы гарантироватьданныеписатьиз Атомность и последовательность,FileSink поддерживает механизм транзакций. При записи данных,Будет использовать дела для обеспечения целостности данных,т.е. либо все данные успешно записаны в файл,Или не пиши вообще,Избегайте отсутствующей или противоречивой информации.
  6. Восстановление:FileSink иметь Восстановлениемеханизм,Возможность перезапуска и продолжения записи после сбоя узла или сбоя задачи. Через контрольно-пропускные пункты и механизмы управления состоянием,Это гарантирует, что последний статус может продолжать записываться после перезапуска задачи.,при этом гарантируя целостность и надежность данных.
  7. Операции с файловой системой:FileSink в конечном итоге запишет данные в файлы В системе это включает в себя создание, запись, обновление, закрытие и другие операции файлов. файловая информация учитывается при записи данных характеристики производительности системы и ограничения, а также как получить максимальную отдачу от файловой системы возможности параллелизма системы и механизм кэширования.

В целом, принципы FileSink включают буферизацию и пакетную обработку потоков данных, группирование данных, настройку стратегии записи, поддержку транзакций, восстановление после сбоев и операции с файловой системой. Благодаря сочетанию этих механизмов можно добиться записи данных в файловую систему. эффективно и надежно.

03 Роллинг Полиси

В Apache Flink FileSink — это оператор вывода, используемый для записи данных в файл. FileSink предоставляет некоторые параметры конфигурации политики смены (Rolling Policy), которые используются для управления сменой выходного файла. Стратегия прокрутки определяет, когда создаются новые файлы, как определяются имена файлов и когда закрываются старые файлы. Ниже приведены распространенные стратегии роллинга во Flink и их использование:

Роллинг по времени Policy by Time): Эта стратегия перемещает файлы на основе временных интервалов, обычно на основе временных меток обработки или фиксированных временных окон.

Язык кода:javascript
копировать
// Например, настройка, которая прокручивается каждый час
RollingPolicy<T> rollingPolicy = DefaultRollingPolicy
    .builder()
    .withRolloverInterval(TimeUnit.HOURS.toMillis(1))
    .build();

Прокатка по размеру Policy by Size): Эта стратегия распределяет файлы в зависимости от их размера и обычно используется для ограничения максимального размера каждого файла.

Язык кода:javascript
копировать
// Например, максимальный размер каждого файла составляет 100. настройки МБ
RollingPolicy<T> rollingPolicy = DefaultRollingPolicy
    .builder()
    .withMaxPartSize(1024 * 1024 * 100) // 100 MB
    .build();

Прокатка по состоянию (Прокатка Policy by Condition): Эта стратегия выполняет смену файлов на основе определенных условий, например, смену файлов при выполнении определенного количества событий или пользовательского условия.

Язык кода:javascript
копировать
// Например, настройка, которая прокручивает каждые 1000 событий.
RollingPolicy<T> rollingPolicy = DefaultRollingPolicy
    .builder()
    .withMaxPartSize(1024 * 1024 * 100) // 100 MB
    .withMaxPartSize(1000) // Делайте ролл каждые 1000 событий
    .build();

Прокрутка по снимку Policy by checkPoint):основнойдапротивforBulkFormatрежим столбцаизскользящая стратегия,довольно особенный

Язык кода:javascript
копировать
CheckpointRollingPolicy<String,String> rollingPolicy = new CheckpointRollingPolicy<>() {
    @Override
    public boolean shouldRollOnEvent(PartFileInfo<String> partFileInfo, String s) throws IOException {
        return false;
    }

    @Override
    public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileInfo, long l) throws IOException {
        return false;
    }
};

Эти стратегии прокатки можно комбинировать и настраивать в зависимости от конкретных потребностей. Например, вы можете настроить прокручиваемые файлы по времени и размеру, чтобы гарантировать, что выходной файл соответствует требованиям времени, но не превышает определенный предел размера. Стратегия прокатки FileSink предоставляет гибкие возможности конфигурации для удовлетворения потребностей различных сценариев вывода.


04 Ведроназначатель

В Flink FileSink использует концепцию Bucket для организации файлов и управления ими. Так называемая корзина относится к папке, в которую должны помещаться данные.

Группирование по времени Formatting): Сегменты можно форматировать по времени, что полезно при оконных операциях, основанных на времени.

Язык кода:javascript
копировать
// Ведрообразование по одному ковшу в час
BucketAssigner<T, String> customBucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH");

Группирование по другим условиям (Пользовательское) Formatting): Пользователи также могут настраивать форматирование сегментов и организовывать сегменты в соответствии с конкретными условиями.

Язык кода:javascript
копировать
        // Кастомизация
        BucketAssigner<String, String> customBucketAssigner = new BucketAssigner<>() {
            @Override
            public String getBucketId(String element, Context context) {
                // Получить отметку времени (в секундах) до того, когда
                long timestamp = System.currentTimeMillis() / 1000;
                // Преобразовать временную метку в LocalDateTime объект
                LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
                // Определить формат даты и времени
                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
                // Форматдата и времяобъектуказать Форматизнить
                return formatter.format(dateTime);
            }

            @Override
            public SimpleVersionedSerializer<String> getSerializer() {
                return SimpleVersionedStringSerializer.INSTANCE;
            }
        };

05 Стратегия сжатия (Сжатие)

FileSink от Flink также поддерживает сжатие данных при записи файлов, чтобы уменьшить объем памяти и повысить эффективность передачи.

**forRowFormat В линейном режиме: **Настраиваемое содержимое ограничено внутренней частью файла. Сжимать файл и выполнять другие операции сложно. forBulkFormat в режиме столбца: **Вы можете не только работать с файлами внутри, но также легко выполнять сжатие файлов и другие операции;

Благодаря этим параметрам конфигурации форматирования и сжатия FileSink обеспечивает гибкость, позволяя пользователям выбирать подходящий способ организации выходных файлов и контролировать размер файлов в зависимости от конкретных потребностей.

Без сжатия (Нет Compression): По умолчанию FileSink не сжимает данные.

Язык кода:javascript
копировать
// Не проводить стягивание
FileSink<String> sink = FileSink
    .forRowFormat(outputPath, new SimpleStringEncoder<T>("UTF-8"))
    .withBucketAssigner(customBucketAssigner)
    .withRollingPolicy(rollingPolicy)
    .withOutputFileConfig(outputFileConfig)
    .build();

Gzip сжатие: FileSink поддерживает использование алгоритма Gzip для сжатия выходных файлов.

Язык кода:javascript
копировать
package com.aurora.demo.FileSink;

import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;

import java.io.IOException;

/**
 * Описание: Алгоритм сжатия пользовательского файла шаблона колонн.
 *
 * @author Кот Асакусы
 * @version 1.0.0
 * @date 2024-02-08 01:20:31
 */
public class CustomBulkWriterFactory implements BulkWriter.Factory<String> {

    @Override
    public BulkWriter<String> create(FSDataOutputStream out) throws IOException {
        GzipCompressorOutputStream gzipOutput = new GzipCompressorOutputStream(out);
        return new BulkWriter<>() {
            @Override
            public void addElement(String element) throws IOException {
                gzipOutput.write(element.getBytes());
            }

            @Override
            public void flush() throws IOException {
                gzipOutput.flush();
            }

            @Override
            public void finish() throws IOException {
                gzipOutput.close();
            }
        };
    }
}

Другие алгоритмы сжатия: Помимо Gzip, FileSink также поддерживает другие алгоритмы сжатия. Вы также можете получить их, непосредственно просмотрев пакет исходного кода, например алгоритм Snappy. Вы можете выбрать подходящий алгоритм сжатия в соответствии с вашими потребностями.

Язык кода:javascript
копировать
commons-compress-1.21.jar
META-INF
org
apache
commons
compress
archivers
changes
compressors
brotli
bzip2
deflate
deflate64
gzip
lz4
lz77support
lzma
lzw
pack200
snappy
xz
z
zstandard
CompressorException
CompressorInputStream
CompressorOutputStream
CompressorStreamFactory
CompressorStreamProvider
FileNameUtil
harmony
java
parallel
utils
MemoryLimitException
PasswordRequiredException

06 Стратегия именования файлов (OutputFileConfig)

6.1 Типы файлов

Файлы в статусе «Завершено» и «В работе» можно отличить только по имени.

  • In-progress / Pending:part–.inprogress.uid
  • Finished:part-- когда Sink Subtask При создании экземпляра это uid назначен на Subtask случайных ID ценить. этот uid не имеет механизма отказоустойчивости, поэтому, когда Subtask от Восстановление, uid будет перегенерирован.

6.2 Настройка файловых суффиксов и суффиксов

Flink позволяет пользователям добавлять префикс и/или суффикс к имени файла детали. Используйте OutputFileConfig для выполнения вышеуказанных функций. Например, Sink добавит префикс «prefix» и суффикс «.ext» к имени созданного файла, как показано ниже:

Язык кода:javascript
копировать
└── 2019-08-25--12
    ├── prefix-4005733d-a830-4323-8291-8866de98b582-0.ext
    ├── prefix-4005733d-a830-4323-8291-8866de98b582-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
    ├── prefix-81fc4980-a6af-41c8-9937-9939408a734b-0.ext
    └── prefix-81fc4980-a6af-41c8-9937-9939408a734b-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11

Реализация кода показана на рисунке.

Язык кода:javascript
копировать
OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("Flink_").withPartSuffix(".dat").build();

07 Стратегия объединения файлов (компактная)

от 1.15 начало версии FileSink Поддержка Start отправлена pending Объединение файлов позволяет приложениям устанавливать меньший период времени и избегать создания большого количества маленьких файлов. Особенно, когда пользователи используют bulk Формат когда: Этот формат требует, чтобы пользователь находился в checkpoint при переключении файлов.

7.1 Метод объединения

enableCompactionOnCheckpoint:Указывает, сколько друг от другаcheckpointсделать слияние,По умолчанию 1

setNumCompactThreads: Сколько потоков использовать для объединения файлов, по умолчанию 1.

FileCompactor:алгоритм слияния

(1) IdenticalFileCompactor: прямое копирование содержимого одного файла в другой файл. Одновременно можно скопировать только один файл;

(2) ConcatFileCompactor: вы можете настроить прямой разделитель между двумя файлами, который передается методом построения.

(3) RecordWiseFileCompactor: имеется множество настраиваемого содержимого.

Язык кода:javascript
копировать
FileSink<String> fileSink = FileSink
//Указываем каталог файла и кодировку записи файла Формат
.forRowFormat(path, new SimpleStringEncoder<String>("UTF-8"))
//Установим стратегию слияния,
.enableCompact(FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(1).setNumCompactThreads(1).build(),new ConcatFileCompactor()).build();

7.2 Принцип слияния

После включения этой функции, когда файл конвертируется в pending Объединение файлов происходит между статусом и окончательной фиксацией файла. Эти pending файл состояния сначала будет отправлен в виде . начало Временные файлы. Затем эти файлы будут объединены в соответствии с указанной пользователем стратегией и методом слияния для создания объединенного файла. pending файл состояния. Эти файлы затем будут отправлены на Committer и отправьте его как официальный документ. После этого исходный временный файл также будет удален.

При включении функции объединения файлов пользователю необходимо указать FileCompactStrategy и FileCompactor

FileCompactStrategy Укажите, когда и какие файлы будут объединены. На данный момент существуют два параллельных условия: размер целевого файла и интервал. Checkpoint количество. Когда общий размер кэшированных в данный момент файлов достигает указанного порога или количества секунд, прошедших с момента последнего слияния. Checkpoint Когда количество раз достигнет указанного числа, FileSink Будет создана асинхронная задача для объединения текущих кэшированных файлов.

FileCompactor Указывает, как объединить файлы, соответствующие заданному списку путей, и записать результаты в файл. В зависимости от того, как записан файл, его можно разделить на две категории:

  • OutputStreamBasedFileCompactor : Пользователь записывает объединенные результаты в выходной поток. Обычно, когда пользователь не хочет или не может читать записи во входном файле, используйте. это тип CompactingFileWriter Примером является ConcatFileCompactor , который напрямую объединяет данные файлы и записывает результаты в выходной поток.
  • RecordWiseFileCompactor : этот тип CompactingFileWriter Входной файл будет считан один за другим.из Зарегистрировать пользователя,тогда иFileWriterТакой жеписатьв выходном файле。CompactingFileWriter Примером является RecordWiseFileCompactor , который читает записи из данного файла и записывает их в CompactingFileWriter середина. Пользователю необходимо указать, как считывать записи из исходного файла.

Примечание 1 После включения функции объединения файлов,При необходимости закройте его позже,Должно быть, строитсяFileSinkЯвно вызывается, когдаdisableCompactметод。

Примечание 2 Если объединение файлов включено, время видимости файлов будет увеличено.

08 Механизм восстановления после сбоя

В распределенной среде отказ узла является распространенной ситуацией. FileSink имеет механизм восстановления после сбоя, который может перезапускаться и продолжать запись данных после сбоя узла, обеспечивая целостность и надежность данных. Вот некоторые ключевые механизмы:

  1. Контрольно-пропускные пункты: Flinkиспользоватьконтрольно-пропускной пунктмеханизмдля достижения отказоустойчивости。контрольно-пропускной пунктда Статус задачиизпоследовательный снимок,Его можно использовать для восстановления предыдущего состояния в случае сбоя задачи. FileSink запишет статус файла, записанный до этого во время контрольной точки.,Это обеспечивает правильное восстановление с контрольной точки при перезапуске задачи.
  2. Семантика «Ровно один раз»: FlinkподдерживатьExactly-OnceСемантика,Это надежная гарантия последовательности. Обычно это зависит от транзакционных гарантий базовой системы хранения данных. FileSink может поддерживать семантику Exactly-Once.,чтобы обеспечитьсуществовать Сбой задачи и восстановлениеизслучайданныене будет повторятьсяписатьили потерян。
  3. Write-Ahead Log(WAL): FileSink иногда использует упреждающую запись Log(WAL)записатьписатьдействовать,Аналогично журналу транзакций библиотеки данных. Это облегчает возможность возврата к согласованному состоянию в случае сбоя задачи. WAL обычно содержит записи операций, которые были записаны,Может использоваться для повторной подачи заявки при возобновлении выполнения задачи.
  4. Идемпотент пишет: В некоторых случаях логика записи FileSink спроектирована как идемпотентная. Это означает, что одну и ту же операцию записи можно безопасно выполнять повторно, не приводя к противоречивым результатам. При идемпотентном проектировании конечный результат одинаков даже при сбоях и перезапусках.

09 Практические сценарии применения

Apache FlinkизFileSin(НапримерBucketingSink)основнойдля приложений потоковой обработкиизрезультатписатьраспределенныйфайловая система。нижедаFileSinkПрактические сценарии Некоторые примеры применения:

  1. Пакетный экспорт данных: Когда вам необходимо экспортировать данные, обработанные приложением потоковой обработки, в распределенные файлы в пакетном режиме. системачас,FileSinkдаобщийизвыбирать。Например,Вы можете записать совокупные результаты за определенный временной интервал в файл.,для последующего анализа или автономной обработки.
  2. Обработка и архивирование журналов: существовать Реальностьчас В сценарии обработки журнала,FileSink можно использовать для записи обработанных данных журнала в файл.,для длительного хранения или дальнейшего анализа. Вы можете разделить файлы журналов на разные каталоги или файлы по времени, типу события и другим критериям.
  3. Формирование отчетов в реальном времени: когдатыиз流处理应用程序生成Реальностьчас报表或分析результатчас,FileSink可以将这些результат以可查询из Форматписатьфайловая система. Это делает отчеты или результаты анализа более удобными для автономного запроса, совместного использования и долгосрочного хранения.
  4. данныерезервное копированиеикопировать: Если вам нужно распределить его по разным файлам Между резервным копированием или копированием данных системы FileSin можно использовать для записи вывода потокового приложения в несколько мест назначения. система. Это обеспечивает эффективные средства поддержания согласованности нескольких копий данных.
  5. Интеграция данных: существоватьданные Сценарии интеграции и обмена,FileSinkможно использовать как общийиз Выход,Выведите обработанные данные в виде файла. Это делает обмен данными между различными системами более гибким.,Потому что файл да — это универсальный Формат обмена данными.

в этих сценариях,FileSinkиз Конфигурация Параметры(Например, путь к файлу、Параметры форматирования、Стратегия разделения и т. д.)Может основываться на конкретныхиз Нужно настроить,для удовлетворения требований различных приложений. пожалуйста, обрати внимание,Реальные сценарии применения, возможно, придется адаптировать в соответствии с конкретными потребностями бизнеса и особенностями данных.

Демонстрация интеграции 10 проектов

10.1 зависимость от помпа

Язык кода:javascript
копировать
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink_connector_file</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--Настройки недвижимости-->
    <properties>
        <!--java_JDKВерсия-->
        <java.version>11</java.version>
        <!--mavenПлагин упаковки-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--Скомпилированная кодировкаUTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--Кодировка выходного отчетаUTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <!--Общие зависимости-->
    <dependencies>
        <!-- флинк читает текст Зависимость файла файла start-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>1.18.0</version>
        </dependency>
        <!-- флинк читает текст Зависимость файла файла end-->

        <!-- фликнуть основные зависимости start -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.18.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.18.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.18.0</version>
        </dependency>
        <!-- фликнуть основные зависимости end -->

    </dependencies>

    <!--Компилируем и упаковываем-->
    <build>
        <finalName>${project.name}</finalName>
        <!--Упаковка файлов ресурсов-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.aurora.KafkaStreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--Единое управление плагинами-->
        <pluginManagement>
            <plugins>
                <!--mavenПлагин упаковки-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--компилировать Плагин упаковки-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

10.2 Работа в режиме строки forRowFormat

Язык кода:javascript
копировать
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.compactor.*;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.UUID;

/**
 * Описание: flink интегрирует FileSink, режим строки forRowFormat.
 *
 * @author Кот Асакусы
 * @version 1.0.0
 * @date 2024-02-07 16:11:50
 */
public class FileRowSinkStreaming {

    public static void main(String[] args) throws Exception {

        //=============1. Стратегия корзины=========================== = ==========
        // Пользовательская стратегия группирования
        BucketAssigner<String, String> customBucketAssigner = new BucketAssigner<>() {
            @Override
            public String getBucketId(String element, Context context) {
                // Получить отметку времени (в секундах) до того, когда
                long timestamp = System.currentTimeMillis() / 1000;
                // Преобразовать временную метку в LocalDateTime объект
                LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
                // Определить формат даты и времени
                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
                // Форматдата и времяобъектуказать Форматизнить
                return formatter.format(dateTime);
            }

            @Override
            public SimpleVersionedSerializer<String> getSerializer() {
                return SimpleVersionedStringSerializer.INSTANCE;
            }
        };
        // Стратегия группирования окон по умолчанию на основе времени
//        BucketAssigner<String, String> customBucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH");

        //==============2.Роллинговая стратегия============================ == ================
        DefaultRollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder()
                //Он прокрутится через 15 минут
                .withRolloverInterval(Duration.ofMinutes(15))
                //от прокрутки, если не получены новые записи за пределами 5-минутной задержки
                .withInactivityInterval(Duration.ofMinutes(5))
                //Размер файла достигнут 1GB затем прокрутите
                .withMaxPartSize(MemorySize.ofMebiBytes(1024))
                .build();

        //===============3. Стратегия именования файлов=========================. = =======
        OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("Flink_").withPartSuffix(".dat").build();

        //================4. Стратегия слияния=========================. ==========
        FileCompactStrategy fileCompactStrategy = FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(1).setNumCompactThreads(1).build();
        //Алгоритм слияния, 3 типа
        //Тип 1: вы можете настроить прямой разделитель между двумя файлами, который передается методом построения
        ConcatFileCompactor fileCompactor = new ConcatFileCompactor();
        //Тип 2: напрямую копировать содержимое одного файла в другой файл, только по одному файлу за раз;
//        IdenticalFileCompactor fileCompactor = new IdenticalFileCompactor();
        //Тип 3: Больше пользовательского контента
//        RecordWiseFileCompactor.Reader.Factory<String> stringFactory = new RecordWiseFileCompactor.Reader.Factory<>() {
//            @Override
//            public RecordWiseFileCompactor.Reader<String> createFor(Path path) throws IOException {
//                //Нужно настроить
//                return null;
//            }
//        };
//        RecordWiseFileCompactor fileCompactor = new RecordWiseFileCompactor(stringFactory);

        // CreateFileSink
        FileSink<String> fileSink = FileSink
                //Указываем каталог файла и кодировку записи файла Формат
                .forRowFormat(new Path("D:\\flink"), new SimpleStringEncoder<String>("UTF-8"))
                //Устанавливаем стратегию слияния
                .enableCompact(fileCompactStrategy, fileCompactor)
                //Стратегия группирования, если стратегия группирования по умолчанию не установлена, DateTimeBucketAssigner, распределитель на основе времени, генерирует один сегмент каждый час, то есть гггг-мм-дд-чч
                .withBucketAssigner(customBucketAssigner)
                //Указываем префикс файла и выходной суффикс
                .withOutputFileConfig(outputFileConfig)
                //Стратегия прокрутки по умолчанию, как часто объединять временные файлы
//                .withBucketCheckInterval(1000)
                //Настройте стратегию прокрутки. Если какое-либо из трех условий выполнено, оно будет прокручиваться.
                .withRollingPolicy(rollingPolicy).build();

        // создавать среда выполнения
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // создать простой источник данных в пакетном режиме
        DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList("Тест эксплуатации и обслуживания", «Развитие эксплуатации и технического обслуживания», «Мальчик, гоняющийся за ветром»);

        // Запишите все источники данных в файл. Обратите внимание, что после включения объединения файлов необходимо установить uid, иначе при непосредственном запуске будет сообщено об ошибке.
        dataStreamSource.sinkTo(fileSink).uid("1");

        // выполнять задания
        env.execute("FileRowSinkStreaming");
    }
}

10.3 Настройка алгоритма сжатия файлов в режиме столбца

Язык кода:javascript
копировать
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;

import java.io.IOException;

/**
 * Описание: Алгоритм сжатия пользовательского файла шаблона колонн.
 *
 * @author Кот Асакусы
 * @version 1.0.0
 * @date 2024-02-08 01:20:31
 */
public class CustomBulkWriterFactory implements BulkWriter.Factory<String> {

    @Override
    public BulkWriter<String> create(FSDataOutputStream out) throws IOException {
        GzipCompressorOutputStream gzipOutput = new GzipCompressorOutputStream(out);
        return new BulkWriter<>() {
            @Override
            public void addElement(String element) throws IOException {
                gzipOutput.write(element.getBytes());
            }

            @Override
            public void flush() throws IOException {
                gzipOutput.flush();
            }

            //Примечание: этот метод не может закрыть поток, переданный Factory, это делается платформой да! ! ! В противном случае программа напрямую сообщит об ошибке.
            @Override
            public void finish() throws IOException {
//                gzipOutput.close();
            }
        };
    }
}

10.4 Задание режима столбца для BulkFormat

Язык кода:javascript
копировать
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.compactor.ConcatFileCompactor;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.IdenticalFileCompactor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;

/**
 * Описание: flink интегрирует режим столбца FileSink, forBulkFormat.
 *
 * @author Кот Асакусы
 * @version 1.0.0
 * @date 2024-02-07 16:11:50
 */
public class FileBulkSinkStreaming {

    public static void main(String[] args) throws Exception {

        //=============1. Стратегия корзины=========================== = ==========
        // Пользовательская стратегия группирования
        BucketAssigner<String, String> customBucketAssigner = new BucketAssigner<>() {
            @Override
            public String getBucketId(String element, Context context) {
                // Получить отметку времени (в секундах) до того, когда
                long timestamp = System.currentTimeMillis() / 1000;
                // Преобразовать временную метку в LocalDateTime объект
                LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
                // Определить формат даты и времени
                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
                // Форматдата и времяобъектуказать Форматизнить
                return formatter.format(dateTime);
            }

            @Override
            public SimpleVersionedSerializer<String> getSerializer() {
                return SimpleVersionedStringSerializer.INSTANCE;
            }
        };
        // Стратегия группирования окон по умолчанию на основе времени
//        BucketAssigner<String, String> customBucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH");

        //==============2.Роллинговая стратегия============================ == ================
        CheckpointRollingPolicy<String,String> rollingPolicy = new CheckpointRollingPolicy<>() {
            @Override
            public boolean shouldRollOnEvent(PartFileInfo<String> partFileInfo, String s) throws IOException {
                return false;
            }

            @Override
            public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileInfo, long l) throws IOException {
                return false;
            }
        };

        //===============3. Стратегия именования файлов=========================. = =======
        OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("Flink_").withPartSuffix(".dat").build();

        //================4. Стратегия слияния=========================. ==========
        FileCompactStrategy fileCompactStrategy = FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(1).setNumCompactThreads(1).build();
        //Алгоритм слияния, 3 типа
        //Тип 1: вы можете настроить прямой разделитель между двумя файлами, который передается методом построения
        ConcatFileCompactor fileCompactor = new ConcatFileCompactor();
        //Тип 2: напрямую копировать содержимое одного файла в другой файл, только по одному файлу за раз;
//        IdenticalFileCompactor fileCompactor = new IdenticalFileCompactor();
        //Тип 3: Больше пользовательского контента
//        RecordWiseFileCompactor.Reader.Factory<String> stringFactory = new RecordWiseFileCompactor.Reader.Factory<>() {
//            @Override
//            public RecordWiseFileCompactor.Reader<String> createFor(Path path) throws IOException {
//                //Нужно настроить
//                return null;
//            }
//        };
//        RecordWiseFileCompactor fileCompactor = new RecordWiseFileCompactor(stringFactory);

        // CreateFileSink
        FileSink<String> fileSink = FileSink
                //Указываем каталог файла и кодировку записи файла Формат
                .forBulkFormat(new Path("D:\\flink"), new CustomBulkWriterFactory())
                //Устанавливаем стратегию слияния
                .enableCompact(fileCompactStrategy, fileCompactor)
                //Стратегия группирования, если стратегия группирования по умолчанию не установлена, DateTimeBucketAssigner, распределитель на основе времени, генерирует один сегмент каждый час, то есть гггг-мм-дд-чч
                .withBucketAssigner(customBucketAssigner)
                //Указываем префикс файла и выходной суффикс
                .withOutputFileConfig(outputFileConfig)
                //Стратегия прокрутки по умолчанию, как часто объединять временные файлы
//                .withBucketCheckInterval(1000)
                //Настройте стратегию прокрутки. Если какое-либо из трех условий выполнено, оно будет прокручиваться.
                .withRollingPolicy(rollingPolicy)
                .build();

        // создавать среда выполнения
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // создать простой источник данных в пакетном режиме
        DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList("Тест эксплуатации и обслуживания", «Развитие эксплуатации и технического обслуживания», «Мальчик, гоняющийся за ветром»);

        // Запишите все источники данных в файл. Обратите внимание, что после включения объединения файлов необходимо установить uid, иначе при непосредственном запуске будет сообщено об ошибке.
        dataStreamSource.sinkTo(fileSink).uid("2");

        // выполнять задания
        env.execute("FileRowSinkStreaming");
    }
}

11 Резюме

FileSink — это основной компонент Apache Flink для записи данных в файлы. Подробно представив его основные концепции, детали реализации и пример кода, каждый сможет получить более полное представление о нем. Я надеюсь, что эта информация поможет каждому лучше понять и применить FileSink, а также осознать необходимость записи данных в файлы в реальных проектах. Комментарии и обмен мнениями приветствуются! ! !

boy illustration
Углубленный анализ переполнения памяти CUDA: OutOfMemoryError: CUDA не хватает памяти. Попыталась выделить 3,21 Ги Б (GPU 0; всего 8,00 Ги Б).
boy illustration
[Решено] ошибка установки conda. Среда решения: не удалось выполнить первоначальное зависание. Повторная попытка с помощью файла (графическое руководство).
boy illustration
Прочитайте нейросетевую модель Трансформера в одной статье
boy illustration
.ART Теплые зимние предложения уже открыты
boy illustration
Сравнительная таблица описания кодов ошибок Amap
boy illustration
Уведомление о последних правилах Points Mall в декабре 2022 года.
boy illustration
Даже новички могут быстро приступить к работе с легким сервером приложений.
boy illustration
Взгляд на RSAC 2024|Защита конфиденциальности в эпоху больших моделей
boy illustration
Вы используете ИИ каждый день и до сих пор не знаете, как ИИ дает обратную связь? Одна статья для понимания реализации в коде Python общих функций потерь генеративных моделей + анализ принципов расчета.
boy illustration
Используйте (внутренний) почтовый ящик для образовательных учреждений, чтобы использовать Microsoft Family Bucket (1T дискового пространства на одном диске и версию Office 365 для образовательных учреждений)
boy illustration
Руководство по началу работы с оперативным проектом (7) Практическое сочетание оперативного письма — оперативного письма на основе интеллектуальной системы вопросов и ответов службы поддержки клиентов
boy illustration
[docker] Версия сервера «Чтение 3» — создайте свою собственную программу чтения веб-текста
boy illustration
Обзор Cloud-init и этапы создания в рамках PVE
boy illustration
Корпоративные пользователи используют пакет регистрационных ресурсов для регистрации ICP для веб-сайта и активации оплаты WeChat H5 (с кодом платежного узла версии API V3)
boy illustration
Подробное объяснение таких показателей производительности с высоким уровнем параллелизма, как QPS, TPS, RT и пропускная способность.
boy illustration
Удачи в конкурсе Python Essay Challenge, станьте первым, кто испытает новую функцию сообщества [Запускать блоки кода онлайн] и выиграйте множество изысканных подарков!
boy illustration
[Техническая посадка травы] Кровавая рвота и отделка позволяют вам необычным образом ощипывать гусиные перья! Не распространяйте информацию! ! !
boy illustration
[Официальное ограниченное по времени мероприятие] Сейчас ноябрь, напишите и получите приз
boy illustration
Прочтите это в одной статье: Учебник для няни по созданию сервера Huanshou Parlu на базе CVM-сервера.
boy illustration
Cloud Native | Что такое CRD (настраиваемые определения ресурсов) в K8s?
boy illustration
Как использовать Cloudflare CDN для настройки узла (CF самостоятельно выбирает IP) Гонконг, Китай/Азия узел/сводка и рекомендации внутреннего высокоскоростного IP-сегмента
boy illustration
Дополнительные правила вознаграждения амбассадоров акции в марте 2023 г.
boy illustration
Можно ли открыть частный сервер Phantom Beast Palu одним щелчком мыши? Супер простой урок для начинающих! (Прилагается метод обновления сервера)
boy illustration
[Играйте с Phantom Beast Palu] Обновите игровой сервер Phantom Beast Pallu одним щелчком мыши
boy illustration
Maotouhu делится: последний доступный внутри страны адрес склада исходного образа Docker 2024 года (обновлено 1 декабря)
boy illustration
Кодирование Base64 в MultipartFile
boy illustration
5 точек расширения SpringBoot, супер практично!
boy illustration
Глубокое понимание сопоставления индексов Elasticsearch.
boy illustration
15 рекомендуемых платформ разработки с нулевым кодом корпоративного уровня. Всегда найдется та, которая вам понравится.
boy illustration
Аннотация EasyExcel позволяет экспортировать с сохранением двух десятичных знаков.