На работе мы обычно используем инструмент разработки IntelliJ IDEA для разработки кода. Чтобы облегчить и быстро отладить Flink и понять статус работы программы Flink, мы надеемся, что веб-интерфейс можно будет просмотреть при запуске Flink в локальном инструменте разработки. чтобы мы могли написать программу Flink, когда локальный веб-интерфейс включен.
Перед Flink1.15 добавьте зависимость соответствующей версии Scala в проект Java Flink или проект Scala Flink в соответствии с используемой версией Scala.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
После версии Flink1.15, будь то проект Java Flink или проект Scala Flink, добавление следующих зависимостей не требует дополнительной зависимости от версии Scala.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
Java-код Flink запускает локальный WebUI:
Configuration conf = new Configuration();
//Устанавливаем локальный порт, привязанный к WebUI
conf.setString(RestOptions.BIND_PORT,"8081");
//Используем конфигурацию
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
Код Flink Scala запускает локальный WebUI:
val configuration = new Configuration()
//Устанавливаем локальный порт, привязанный к WebUI
configuration.set(RestOptions.BIND_PORT,"8081")
//Используем конфигурацию
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
Пример Java-кода:
//1. Использовать локальный режим.
Configuration conf = new Configuration();
//Устанавливаем локальный порт, привязанный к WebUI
conf.setString(RestOptions.BIND_PORT,"8081");
//Используем конфигурацию
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
//2. Чтение данных сокета
DataStreamSource<String> ds = env.socketTextStream("node3", 9999);
//3. Подготовьте данные формата K, V.
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
String[] words = line.split(",");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
//4. Совокупные результаты печати.
tupleDS.keyBy(tp -> tp.f0).sum(1).print();
//5.execute запускает выполнение
env.execute();
Код для запуска:
Пример кода Scala:
//1. Создаём локальную среду WebUI.
val configuration = new Configuration()
//Устанавливаем привязанный локальный порт
configuration.set(RestOptions.BIND_PORT,"80")
//Первый метод настройки
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
//2.Scala Неявное преобразование импорта потоковой обработки с использованием Scala API Неявное преобразование необходимо для вывода типа после операции функции.
import org.apache.flink.streaming.api.scala._
//3. Чтение данных сокета
val linesDS: DataStream[String] = env.socketTextStream("node3", 9999)
//4. Выполнение статистики WordCount.
linesDS.flatMap(line=>{line.split(",")})
.map((_,1))
.keyBy(_._1)
.sum(1)
.print()
//5.Наконец используем выполнение Метод запускает выполнение
env.execute()
Доступ к любому из вышеперечисленных кодов можно получить через:http://localhostПриходите и посмотритеWebUI。
Примечание. Прежде чем запускать код, сначала запустите службу Socket на узле 3, а затем запустите код. После импорта зависимости flink-runtime-web лучше всего перезапустить инструмент разработки и перезагрузить соответствующий пакет зависимостей. В противном случае при доступе к локальному файлу могут возникнуть ошибки «{"errors":["Not Found: /"]}". WebUI после выполнения кода Доступ к WebUI невозможен.