1. Флинк Столбец
Flink СтолбецСистема вводит определенную точку знаний,Это объясняется на конкретных примерах.
2. Флинк Пример Столбец
Пример Флинка Столбецда Flink Полезная инструкция для Столбца,Информация по очкам знаний обычно не вводится.,Речь идет скорее о предоставлении примеров, которые можно использовать конкретно. Этот Столбец больше не разделен на каталоги.,С содержанием вступления можно ознакомиться по ссылке.
два Столбец Все клики при входе в статью:Серия статей Сводный указатель Flink
@TOC
В этой статье в основном представлены три часто используемых оператора Flink (карта, плоская карта и фильтр) и объяснены их конкретные примеры выполнения.
Если вам нужно знать больше,Можно сделать личноFlink СтолбецУзнайте больше об обновлении вашей системы。
В этой статье нет других зависимостей, кроме зависимостей maven.
Данная тема разделена на пять частей, а именно:
Во всех примерах ниже используется эта зависимость maven, если не указано иное.
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- бревно -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.4</version>
</dependency>
</dependencies>
Следующий Пользователь зависит от следующего:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private int id;
private String name;
private String pwd;
private String email;
private int age;
private double balance;
}
DataStream->DataStream
Это одно из простейших преобразований, где входом является поток данных, а выходом также является поток данных.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;
/**
* @author alanchan
*
*/
public class TestMapDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// source
// transformation
mapFunction5(env);
// sink
// execute
env.execute();
}
// Создайте список, затем умножьте числа в списке на 2 и выведите его, реализованный внутренним анонимным классом.
public static void mapFunction1(StreamExecutionEnvironment env) throws Exception {
List<Integer> data = new ArrayList<Integer>();
for (int i = 1; i <= 10; i++) {
data.add(i);
}
DataStreamSource<Integer> source = env.fromCollection(data);
SingleOutputStreamOperator<Integer> sink = source.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer inValue) throws Exception {
return inValue * 2;
}
});
sink.print();
// 9> 12
// 4> 2
// 10> 14
// 8> 10
// 13> 20
// 7> 8
// 12> 18
// 11> 16
// 5> 4
// 6> 6
}
// Создайте список, а затем умножьте числа в списке на 2 для вывода, реализованного лямбда-выражением.
public static void mapFunction2(StreamExecutionEnvironment env) throws Exception {
List<Integer> data = new ArrayList<Integer>();
for (int i = 1; i <= 10; i++) {
data.add(i);
}
DataStreamSource<Integer> source = env.fromCollection(data);
SingleOutputStreamOperator<Integer> sink = source.map(i -> 2 * i);
sink.print();
// 3> 4
// 4> 6
// 9> 16
// 7> 12
// 10> 18
// 2> 2
// 6> 10
// 5> 8
// 8> 14
// 11> 20
}
// Создать источник данных пользователя
public static DataStreamSource<User> source(StreamExecutionEnvironment env) {
DataStreamSource<User> source = env.fromCollection(Arrays.asList(
new User(1, "alan1", "1", "1@1.com", 12, 1000),
new User(2, "alan2", "2", "2@2.com", 19, 200),
new User(3, "alan1", "3", "3@3.com", 28, 1500),
new User(5, "alan1", "5", "5@5.com", 15, 500),
new User(4, "alan2", "4", "4@4.com", 30, 400)
)
);
return source;
}
// Лямбда реализует функции Balance×2 и Age+5 пользовательского объекта.
public static SingleOutputStreamOperator<User> mapFunction3(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<User> source = source(env);
SingleOutputStreamOperator<User> sink = source.map((MapFunction<User, User>) user -> {
User user2 = user;
user2.setAge(user.getAge() + 5);
user2.setBalance(user.getBalance() * 2);
return user2;
});
sink.print();
// 10> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
// 14> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
// 11> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
// 12> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
// 13> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)
return sink;
}
// После того как лямбда реализует баланс*2 и возраст+5, данные баланса》=2000 и возраста》=20 отфильтровываются.
public static SingleOutputStreamOperator<User> mapFunction4(StreamExecutionEnvironment env) throws Exception {
SingleOutputStreamOperator<User> sink = mapFunction3(env).filter(user -> user.getBalance() >= 2000 && user.getAge() >= 20);
sink.print();
// 15> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
// 1> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
// 16> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
// 3> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
// 2> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)
// 1> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
return sink;
}
// После того как лямбда реализует баланс*2 и возраст+5, данные баланса》=2000 и возраста》=20 отфильтровываются и собираются через Flatmap.
public static SingleOutputStreamOperator<User> mapFunction5(StreamExecutionEnvironment env) throws Exception {
SingleOutputStreamOperator<User> sink = mapFunction4(env).flatMap((FlatMapFunction<User, User>) (user, out) -> {
if (user.getBalance() >= 3000) {
out.collect(user);
}
}).returns(User.class);
sink.print();
// 8> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)
// 7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
// 6> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
// 9> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
// 5> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
// 7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
// 7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
return sink;
}
}
DataStream->DataStream
FlatMap принимает запись и выводит ноль, одну или несколько записей. Преобразует каждый элемент коллекции в один или несколько элементов и возвращает сведенный результат.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author alanchan
*
*/
public class TestFlatMapDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
flatMapFunction3(env);
env.execute();
}
// Создать источник данных пользователя
public static DataStreamSource<String> source(StreamExecutionEnvironment env) {
List<String> info = new ArrayList<>();
info.add("i am alanchan");
info.add("i like hadoop");
info.add("i like flink");
info.add("and you ?");
DataStreamSource<String> dataSource = env.fromCollection(info);
return dataSource;
}
// Разделение предложений пробелами — реализация внутреннего анонимного класса
public static void flatMapFunction1(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<String> source = source(env);
SingleOutputStreamOperator<String> sink = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] splits = value.split(" ");
for (String split : splits) {
out.collect(split);
}
}
});
sink.print();
// 11> and
// 10> i
// 8> i
// 9> i
// 8> am
// 10> like
// 11> you
// 10> flink
// 8> alanchan
// 9> like
// 11> ?
// 9> hadoop
}
// реализация лямбды
public static void flatMapFunction2(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<String> source = source(env);
SingleOutputStreamOperator<String> sink = source.flatMap((FlatMapFunction<String, String>) (input, out) -> {
String[] splits = input.split(" ");
for (String split : splits) {
out.collect(split);
}
}).returns(String.class);
sink.print();
// 6> i
// 8> and
// 8> you
// 8> ?
// 5> i
// 7> i
// 5> am
// 5> alanchan
// 6> like
// 7> like
// 6> hadoop
// 7> flink
}
// реализация лямбды
public static void flatMapFunction3(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<String> source = source(env);
SingleOutputStreamOperator<String> sink = source.flatMap((String input, Collector<String> out) -> Arrays.stream(input.split(" ")).forEach(out::collect))
.returns(String.class);
sink.print();
// 8> i
// 11> and
// 10> i
// 9> i
// 10> like
// 11> you
// 8> am
// 11> ?
// 10> flink
// 9> like
// 9> hadoop
// 8> alanchan
}
}
DataStream → DataStream
Функция «Фильтр» определяет результат на основе условий. Отфильтруйте элементы в коллекции в соответствии с указанными условиями и отфильтруйте элементы, которые возвращают true/соответствуют условиям.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;
/**
* @author alanchan
*
*/
public class TestFilterDemo {
// Создать источник данных пользователя
public static DataStreamSource<User> sourceUser(StreamExecutionEnvironment env) {
DataStreamSource<User> source = env.fromCollection(Arrays.asList(
new User(1, "alan1", "1", "1@1.com", 12, 1000),
new User(2, "alan2", "2", "2@2.com", 19, 200),
new User(3, "alan1", "3", "3@3.com", 28, 1500),
new User(5, "alan1", "5", "5@5.com", 15, 500),
new User(4, "alan2", "4", "4@4.com", 30, 400)));
return source;
}
// Создать источник данных пользователя
public static DataStreamSource<Integer> sourceList(StreamExecutionEnvironment env) {
List<Integer> data = new ArrayList<Integer>();
for (int i = 1; i <= 10; i++) {
data.add(i);
}
DataStreamSource<Integer> source = env.fromCollection(data);
return source;
}
// Отфильтровать числа больше 5, внутренний анонимный класс
public static void filterFunction1(StreamExecutionEnvironment env) throws Exception {
DataStream<Integer> source = sourceList(env);
SingleOutputStreamOperator<Integer> sink = source.map(new MapFunction<Integer, Integer>() {
public Integer map(Integer value) throws Exception {
return value + 1;
}
}).filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value > 5;
}
});
sink.print();
// 1> 10
// 14> 7
// 16> 9
// 13> 6
// 2> 11
// 15> 8
}
// реализация лямбды
public static void filterFunction2(StreamExecutionEnvironment env) throws Exception {
DataStream<Integer> source = sourceList(env);
SingleOutputStreamOperator<Integer> sink = source.map(i -> i + 1).filter(value -> value > 5);
sink.print();
// 12> 7
// 15> 10
// 11> 6
// 13> 8
// 14> 9
// 16> 11
}
// Запросить пользователя Записи с идентификатором больше 3
public static void filterFunction3(StreamExecutionEnvironment env) throws Exception {
DataStream<User> source = sourceUser(env);
SingleOutputStreamOperator<User> sink = source.filter(user -> user.getId() > 3);
sink.print();
// 14> User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)
// 15> User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
filterFunction3(env);
env.execute();
}
}
В этой статье в основном представлены три часто используемых оператора Flink и объяснены их конкретные примеры выполнения.
Если вам нужно знать больше,Можно сделать личноFlink СтолбецУзнайте больше об обновлении вашей системы。
Данная тема разделена на пять частей, а именно: