FileSink — это функция Sink в Flink, которая используется для вывода обработанных данных в файловую систему. Он способен обрабатывать потоки данных в реальном времени и предоставляет гибкие возможности конфигурации, которые позволяют пользователям определять формат, путь и стратегию записи выходных файлов.
FileSink — это функция Sink в Apache Flink, которая используется для вывода данных результатов потоковой обработки в файловую систему. Этот принцип включает в себя модель обработки потока данных Flink и операции файловой системы.
В целом, принципы FileSink включают буферизацию и пакетную обработку потоков данных, группирование данных, настройку стратегии записи, поддержку транзакций, восстановление после сбоев и операции с файловой системой. Благодаря сочетанию этих механизмов можно добиться записи данных в файловую систему. эффективно и надежно.
В Apache Flink FileSink — это оператор вывода, используемый для записи данных в файл. FileSink предоставляет некоторые параметры конфигурации политики смены (Rolling Policy), которые используются для управления сменой выходного файла. Стратегия прокрутки определяет, когда создаются новые файлы, как определяются имена файлов и когда закрываются старые файлы. Ниже приведены распространенные стратегии роллинга во Flink и их использование:
Роллинг по времени Policy by Time): Эта стратегия перемещает файлы на основе временных интервалов, обычно на основе временных меток обработки или фиксированных временных окон.
// Например, настройка, которая прокручивается каждый час
RollingPolicy<T> rollingPolicy = DefaultRollingPolicy
.builder()
.withRolloverInterval(TimeUnit.HOURS.toMillis(1))
.build();
Прокатка по размеру Policy by Size): Эта стратегия распределяет файлы в зависимости от их размера и обычно используется для ограничения максимального размера каждого файла.
// Например, максимальный размер каждого файла составляет 100. настройки МБ
RollingPolicy<T> rollingPolicy = DefaultRollingPolicy
.builder()
.withMaxPartSize(1024 * 1024 * 100) // 100 MB
.build();
Прокатка по состоянию (Прокатка Policy by Condition): Эта стратегия выполняет смену файлов на основе определенных условий, например, смену файлов при выполнении определенного количества событий или пользовательского условия.
// Например, настройка, которая прокручивает каждые 1000 событий.
RollingPolicy<T> rollingPolicy = DefaultRollingPolicy
.builder()
.withMaxPartSize(1024 * 1024 * 100) // 100 MB
.withMaxPartSize(1000) // Делайте ролл каждые 1000 событий
.build();
Прокрутка по снимку Policy by checkPoint):основнойдапротивforBulkFormatрежим столбцаизскользящая стратегия,довольно особенный
CheckpointRollingPolicy<String,String> rollingPolicy = new CheckpointRollingPolicy<>() {
@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileInfo, String s) throws IOException {
return false;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileInfo, long l) throws IOException {
return false;
}
};
Эти стратегии прокатки можно комбинировать и настраивать в зависимости от конкретных потребностей. Например, вы можете настроить прокручиваемые файлы по времени и размеру, чтобы гарантировать, что выходной файл соответствует требованиям времени, но не превышает определенный предел размера. Стратегия прокатки FileSink предоставляет гибкие возможности конфигурации для удовлетворения потребностей различных сценариев вывода.
В Flink FileSink использует концепцию Bucket для организации файлов и управления ими. Так называемая корзина относится к папке, в которую должны помещаться данные.
Группирование по времени Formatting): Сегменты можно форматировать по времени, что полезно при оконных операциях, основанных на времени.
// Ведрообразование по одному ковшу в час
BucketAssigner<T, String> customBucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH");
Группирование по другим условиям (Пользовательское) Formatting): Пользователи также могут настраивать форматирование сегментов и организовывать сегменты в соответствии с конкретными условиями.
// Кастомизация
BucketAssigner<String, String> customBucketAssigner = new BucketAssigner<>() {
@Override
public String getBucketId(String element, Context context) {
// Получить отметку времени (в секундах) до того, когда
long timestamp = System.currentTimeMillis() / 1000;
// Преобразовать временную метку в LocalDateTime объект
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
// Определить формат даты и времени
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
// Форматдата и времяобъектуказать Форматизнить
return formatter.format(dateTime);
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
};
FileSink от Flink также поддерживает сжатие данных при записи файлов, чтобы уменьшить объем памяти и повысить эффективность передачи.
**forRowFormat В линейном режиме: **Настраиваемое содержимое ограничено внутренней частью файла. Сжимать файл и выполнять другие операции сложно. forBulkFormat в режиме столбца: **Вы можете не только работать с файлами внутри, но также легко выполнять сжатие файлов и другие операции;
Благодаря этим параметрам конфигурации форматирования и сжатия FileSink обеспечивает гибкость, позволяя пользователям выбирать подходящий способ организации выходных файлов и контролировать размер файлов в зависимости от конкретных потребностей.
Без сжатия (Нет Compression): По умолчанию FileSink не сжимает данные.
// Не проводить стягивание
FileSink<String> sink = FileSink
.forRowFormat(outputPath, new SimpleStringEncoder<T>("UTF-8"))
.withBucketAssigner(customBucketAssigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig)
.build();
Gzip сжатие: FileSink поддерживает использование алгоритма Gzip для сжатия выходных файлов.
package com.aurora.demo.FileSink;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import java.io.IOException;
/**
* Описание: Алгоритм сжатия пользовательского файла шаблона колонн.
*
* @author Кот Асакусы
* @version 1.0.0
* @date 2024-02-08 01:20:31
*/
public class CustomBulkWriterFactory implements BulkWriter.Factory<String> {
@Override
public BulkWriter<String> create(FSDataOutputStream out) throws IOException {
GzipCompressorOutputStream gzipOutput = new GzipCompressorOutputStream(out);
return new BulkWriter<>() {
@Override
public void addElement(String element) throws IOException {
gzipOutput.write(element.getBytes());
}
@Override
public void flush() throws IOException {
gzipOutput.flush();
}
@Override
public void finish() throws IOException {
gzipOutput.close();
}
};
}
}
Другие алгоритмы сжатия: Помимо Gzip, FileSink также поддерживает другие алгоритмы сжатия. Вы также можете получить их, непосредственно просмотрев пакет исходного кода, например алгоритм Snappy. Вы можете выбрать подходящий алгоритм сжатия в соответствии с вашими потребностями.
commons-compress-1.21.jar
META-INF
org
apache
commons
compress
archivers
changes
compressors
brotli
bzip2
deflate
deflate64
gzip
lz4
lz77support
lzma
lzw
pack200
snappy
xz
z
zstandard
CompressorException
CompressorInputStream
CompressorOutputStream
CompressorStreamFactory
CompressorStreamProvider
FileNameUtil
harmony
java
parallel
utils
MemoryLimitException
PasswordRequiredException
Файлы в статусе «Завершено» и «В работе» можно отличить только по имени.
Flink позволяет пользователям добавлять префикс и/или суффикс к имени файла детали. Используйте OutputFileConfig для выполнения вышеуказанных функций. Например, Sink добавит префикс «prefix» и суффикс «.ext» к имени созданного файла, как показано ниже:
└── 2019-08-25--12
├── prefix-4005733d-a830-4323-8291-8866de98b582-0.ext
├── prefix-4005733d-a830-4323-8291-8866de98b582-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
├── prefix-81fc4980-a6af-41c8-9937-9939408a734b-0.ext
└── prefix-81fc4980-a6af-41c8-9937-9939408a734b-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
Реализация кода показана на рисунке.
OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("Flink_").withPartSuffix(".dat").build();
от 1.15 начало версии FileSink
Поддержка Start отправлена pending
Объединение файлов позволяет приложениям устанавливать меньший период времени и избегать создания большого количества маленьких файлов. Особенно, когда пользователи используют bulk Формат когда: Этот формат требует, чтобы пользователь находился в checkpoint при переключении файлов.
enableCompactionOnCheckpoint:Указывает, сколько друг от другаcheckpointсделать слияние,По умолчанию 1
setNumCompactThreads: Сколько потоков использовать для объединения файлов, по умолчанию 1.
FileCompactor:алгоритм слияния
(1) IdenticalFileCompactor: прямое копирование содержимого одного файла в другой файл. Одновременно можно скопировать только один файл;
(2) ConcatFileCompactor: вы можете настроить прямой разделитель между двумя файлами, который передается методом построения.
(3) RecordWiseFileCompactor: имеется множество настраиваемого содержимого.
FileSink<String> fileSink = FileSink
//Указываем каталог файла и кодировку записи файла Формат
.forRowFormat(path, new SimpleStringEncoder<String>("UTF-8"))
//Установим стратегию слияния,
.enableCompact(FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(1).setNumCompactThreads(1).build(),new ConcatFileCompactor()).build();
После включения этой функции, когда файл конвертируется в pending
Объединение файлов происходит между статусом и окончательной фиксацией файла. Эти pending
файл состояния сначала будет отправлен в виде .
начало Временные файлы. Затем эти файлы будут объединены в соответствии с указанной пользователем стратегией и методом слияния для создания объединенного файла. pending
файл состояния. Эти файлы затем будут отправлены на Committer и отправьте его как официальный документ. После этого исходный временный файл также будет удален.
При включении функции объединения файлов пользователю необходимо указать FileCompactStrategy и FileCompactor 。
FileCompactStrategy Укажите, когда и какие файлы будут объединены. На данный момент существуют два параллельных условия: размер целевого файла и интервал. Checkpoint количество. Когда общий размер кэшированных в данный момент файлов достигает указанного порога или количества секунд, прошедших с момента последнего слияния. Checkpoint Когда количество раз достигнет указанного числа, FileSink
Будет создана асинхронная задача для объединения текущих кэшированных файлов.
FileCompactor Указывает, как объединить файлы, соответствующие заданному списку путей, и записать результаты в файл. В зависимости от того, как записан файл, его можно разделить на две категории:
CompactingFileWriter
Примером является ConcatFileCompactor , который напрямую объединяет данные файлы и записывает результаты в выходной поток.CompactingFileWriter
Входной файл будет считан один за другим.из Зарегистрировать пользователя,тогда иFileWriter
Такой жеписатьв выходном файле。CompactingFileWriter
Примером является RecordWiseFileCompactor , который читает записи из данного файла и записывает их в CompactingFileWriter
середина. Пользователю необходимо указать, как считывать записи из исходного файла.Примечание 1 После включения функции объединения файлов,При необходимости закройте его позже,Должно быть, строитсяFileSink
Явно вызывается, когдаdisableCompact
метод。
Примечание 2 Если объединение файлов включено, время видимости файлов будет увеличено.
В распределенной среде отказ узла является распространенной ситуацией. FileSink имеет механизм восстановления после сбоя, который может перезапускаться и продолжать запись данных после сбоя узла, обеспечивая целостность и надежность данных. Вот некоторые ключевые механизмы:
Apache FlinkизFileSin(НапримерBucketingSink
)основнойдля приложений потоковой обработкиизрезультатписатьраспределенныйфайловая система。нижедаFileSinkПрактические сценарии Некоторые примеры применения:
FileSink
даобщийизвыбирать。Например,Вы можете записать совокупные результаты за определенный временной интервал в файл.,для последующего анализа или автономной обработки.FileSink
可以将这些результат以可查询из Форматписатьфайловая система. Это делает отчеты или результаты анализа более удобными для автономного запроса, совместного использования и долгосрочного хранения.FileSink
можно использовать как общийиз Выход,Выведите обработанные данные в виде файла. Это делает обмен данными между различными системами более гибким.,Потому что файл да — это универсальный Формат обмена данными.в этих сценариях,FileSink
из Конфигурация Параметры(Например, путь к файлу、Параметры форматирования、Стратегия разделения и т. д.)Может основываться на конкретныхиз Нужно настроить,для удовлетворения требований различных приложений. пожалуйста, обрати внимание,Реальные сценарии применения, возможно, придется адаптировать в соответствии с конкретными потребностями бизнеса и особенностями данных.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xsy</groupId>
<artifactId>aurora_flink_connector_file</artifactId>
<version>1.0-SNAPSHOT</version>
<!--Настройки недвижимости-->
<properties>
<!--java_JDKВерсия-->
<java.version>11</java.version>
<!--mavenПлагин упаковки-->
<maven.plugin.version>3.8.1</maven.plugin.version>
<!--Скомпилированная кодировкаUTF-8-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--Кодировка выходного отчетаUTF-8-->
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<!--Общие зависимости-->
<dependencies>
<!-- флинк читает текст Зависимость файла файла start-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.18.0</version>
</dependency>
<!-- флинк читает текст Зависимость файла файла end-->
<!-- фликнуть основные зависимости start -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.0</version>
</dependency>
<!-- фликнуть основные зависимости end -->
</dependencies>
<!--Компилируем и упаковываем-->
<build>
<finalName>${project.name}</finalName>
<!--Упаковка файлов ресурсов-->
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.google.code.flindbugs:jar305</exclude>
<exclude>org.slf4j:*</exclude>
<excluder>org.apache.logging.log4j:*</excluder>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.aurora.KafkaStreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!--Единое управление плагинами-->
<pluginManagement>
<plugins>
<!--mavenПлагин упаковки-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<configuration>
<fork>true</fork>
<finalName>${project.build.finalName}</finalName>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<!--компилировать Плагин упаковки-->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.compactor.*;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.UUID;
/**
* Описание: flink интегрирует FileSink, режим строки forRowFormat.
*
* @author Кот Асакусы
* @version 1.0.0
* @date 2024-02-07 16:11:50
*/
public class FileRowSinkStreaming {
public static void main(String[] args) throws Exception {
//=============1. Стратегия корзины=========================== = ==========
// Пользовательская стратегия группирования
BucketAssigner<String, String> customBucketAssigner = new BucketAssigner<>() {
@Override
public String getBucketId(String element, Context context) {
// Получить отметку времени (в секундах) до того, когда
long timestamp = System.currentTimeMillis() / 1000;
// Преобразовать временную метку в LocalDateTime объект
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
// Определить формат даты и времени
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
// Форматдата и времяобъектуказать Форматизнить
return formatter.format(dateTime);
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
};
// Стратегия группирования окон по умолчанию на основе времени
// BucketAssigner<String, String> customBucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH");
//==============2.Роллинговая стратегия============================ == ================
DefaultRollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder()
//Он прокрутится через 15 минут
.withRolloverInterval(Duration.ofMinutes(15))
//от прокрутки, если не получены новые записи за пределами 5-минутной задержки
.withInactivityInterval(Duration.ofMinutes(5))
//Размер файла достигнут 1GB затем прокрутите
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build();
//===============3. Стратегия именования файлов=========================. = =======
OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("Flink_").withPartSuffix(".dat").build();
//================4. Стратегия слияния=========================. ==========
FileCompactStrategy fileCompactStrategy = FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(1).setNumCompactThreads(1).build();
//Алгоритм слияния, 3 типа
//Тип 1: вы можете настроить прямой разделитель между двумя файлами, который передается методом построения
ConcatFileCompactor fileCompactor = new ConcatFileCompactor();
//Тип 2: напрямую копировать содержимое одного файла в другой файл, только по одному файлу за раз;
// IdenticalFileCompactor fileCompactor = new IdenticalFileCompactor();
//Тип 3: Больше пользовательского контента
// RecordWiseFileCompactor.Reader.Factory<String> stringFactory = new RecordWiseFileCompactor.Reader.Factory<>() {
// @Override
// public RecordWiseFileCompactor.Reader<String> createFor(Path path) throws IOException {
// //Нужно настроить
// return null;
// }
// };
// RecordWiseFileCompactor fileCompactor = new RecordWiseFileCompactor(stringFactory);
// CreateFileSink
FileSink<String> fileSink = FileSink
//Указываем каталог файла и кодировку записи файла Формат
.forRowFormat(new Path("D:\\flink"), new SimpleStringEncoder<String>("UTF-8"))
//Устанавливаем стратегию слияния
.enableCompact(fileCompactStrategy, fileCompactor)
//Стратегия группирования, если стратегия группирования по умолчанию не установлена, DateTimeBucketAssigner, распределитель на основе времени, генерирует один сегмент каждый час, то есть гггг-мм-дд-чч
.withBucketAssigner(customBucketAssigner)
//Указываем префикс файла и выходной суффикс
.withOutputFileConfig(outputFileConfig)
//Стратегия прокрутки по умолчанию, как часто объединять временные файлы
// .withBucketCheckInterval(1000)
//Настройте стратегию прокрутки. Если какое-либо из трех условий выполнено, оно будет прокручиваться.
.withRollingPolicy(rollingPolicy).build();
// создавать среда выполнения
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// создать простой источник данных в пакетном режиме
DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList("Тест эксплуатации и обслуживания", «Развитие эксплуатации и технического обслуживания», «Мальчик, гоняющийся за ветром»);
// Запишите все источники данных в файл. Обратите внимание, что после включения объединения файлов необходимо установить uid, иначе при непосредственном запуске будет сообщено об ошибке.
dataStreamSource.sinkTo(fileSink).uid("1");
// выполнять задания
env.execute("FileRowSinkStreaming");
}
}
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import java.io.IOException;
/**
* Описание: Алгоритм сжатия пользовательского файла шаблона колонн.
*
* @author Кот Асакусы
* @version 1.0.0
* @date 2024-02-08 01:20:31
*/
public class CustomBulkWriterFactory implements BulkWriter.Factory<String> {
@Override
public BulkWriter<String> create(FSDataOutputStream out) throws IOException {
GzipCompressorOutputStream gzipOutput = new GzipCompressorOutputStream(out);
return new BulkWriter<>() {
@Override
public void addElement(String element) throws IOException {
gzipOutput.write(element.getBytes());
}
@Override
public void flush() throws IOException {
gzipOutput.flush();
}
//Примечание: этот метод не может закрыть поток, переданный Factory, это делается платформой да! ! ! В противном случае программа напрямую сообщит об ошибке.
@Override
public void finish() throws IOException {
// gzipOutput.close();
}
};
}
}
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.compactor.ConcatFileCompactor;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.IdenticalFileCompactor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
/**
* Описание: flink интегрирует режим столбца FileSink, forBulkFormat.
*
* @author Кот Асакусы
* @version 1.0.0
* @date 2024-02-07 16:11:50
*/
public class FileBulkSinkStreaming {
public static void main(String[] args) throws Exception {
//=============1. Стратегия корзины=========================== = ==========
// Пользовательская стратегия группирования
BucketAssigner<String, String> customBucketAssigner = new BucketAssigner<>() {
@Override
public String getBucketId(String element, Context context) {
// Получить отметку времени (в секундах) до того, когда
long timestamp = System.currentTimeMillis() / 1000;
// Преобразовать временную метку в LocalDateTime объект
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());
// Определить формат даты и времени
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
// Форматдата и времяобъектуказать Форматизнить
return formatter.format(dateTime);
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
};
// Стратегия группирования окон по умолчанию на основе времени
// BucketAssigner<String, String> customBucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd--HH");
//==============2.Роллинговая стратегия============================ == ================
CheckpointRollingPolicy<String,String> rollingPolicy = new CheckpointRollingPolicy<>() {
@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileInfo, String s) throws IOException {
return false;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileInfo, long l) throws IOException {
return false;
}
};
//===============3. Стратегия именования файлов=========================. = =======
OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("Flink_").withPartSuffix(".dat").build();
//================4. Стратегия слияния=========================. ==========
FileCompactStrategy fileCompactStrategy = FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(1).setNumCompactThreads(1).build();
//Алгоритм слияния, 3 типа
//Тип 1: вы можете настроить прямой разделитель между двумя файлами, который передается методом построения
ConcatFileCompactor fileCompactor = new ConcatFileCompactor();
//Тип 2: напрямую копировать содержимое одного файла в другой файл, только по одному файлу за раз;
// IdenticalFileCompactor fileCompactor = new IdenticalFileCompactor();
//Тип 3: Больше пользовательского контента
// RecordWiseFileCompactor.Reader.Factory<String> stringFactory = new RecordWiseFileCompactor.Reader.Factory<>() {
// @Override
// public RecordWiseFileCompactor.Reader<String> createFor(Path path) throws IOException {
// //Нужно настроить
// return null;
// }
// };
// RecordWiseFileCompactor fileCompactor = new RecordWiseFileCompactor(stringFactory);
// CreateFileSink
FileSink<String> fileSink = FileSink
//Указываем каталог файла и кодировку записи файла Формат
.forBulkFormat(new Path("D:\\flink"), new CustomBulkWriterFactory())
//Устанавливаем стратегию слияния
.enableCompact(fileCompactStrategy, fileCompactor)
//Стратегия группирования, если стратегия группирования по умолчанию не установлена, DateTimeBucketAssigner, распределитель на основе времени, генерирует один сегмент каждый час, то есть гггг-мм-дд-чч
.withBucketAssigner(customBucketAssigner)
//Указываем префикс файла и выходной суффикс
.withOutputFileConfig(outputFileConfig)
//Стратегия прокрутки по умолчанию, как часто объединять временные файлы
// .withBucketCheckInterval(1000)
//Настройте стратегию прокрутки. Если какое-либо из трех условий выполнено, оно будет прокручиваться.
.withRollingPolicy(rollingPolicy)
.build();
// создавать среда выполнения
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// создать простой источник данных в пакетном режиме
DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList("Тест эксплуатации и обслуживания", «Развитие эксплуатации и технического обслуживания», «Мальчик, гоняющийся за ветром»);
// Запишите все источники данных в файл. Обратите внимание, что после включения объединения файлов необходимо установить uid, иначе при непосредственном запуске будет сообщено об ошибке.
dataStreamSource.sinkTo(fileSink).uid("2");
// выполнять задания
env.execute("FileRowSinkStreaming");
}
}
FileSink — это основной компонент Apache Flink для записи данных в файлы. Подробно представив его основные концепции, детали реализации и пример кода, каждый сможет получить более полное представление о нем. Я надеюсь, что эта информация поможет каждому лучше понять и применить FileSink, а также осознать необходимость записи данных в файлы в реальных проектах. Комментарии и обмен мнениями приветствуются! ! !