Spark RDD читает и записывает ES
Spark Streaming записывает в ES
Elaticsearch-7.14.2
Spark-3.2.1
jdk-1.8
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qcloud.abi</groupId>
<artifactId>esspark</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-30_2.12</artifactId>
<version>7.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!--Укажите расположение файла записи-->
<mainClass>com.xx.TestMain</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
public class ReadES {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("readEs").setMaster("local[2]")
.set("es.nodes", "https://es-jnycbqnd.public.tencentelasticsearch.com")
.set("es.port", "9200")
.set("es.net.http.auth.user", "elastic")
.set("es.net.http.auth.pass", "passwd")
.set("es.nodes.wan.only", "true")
.set("es.nodes.discovery","false")
.set("es.input.use.sliced.partitions","false")
.set("es.resource", "spark_write")
.set("es.scroll.size","500");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
for ( Map<String, Object> item : rdd.values().collect()) {
System.out.println(item);
}
sc.stop();
}
}
Чтение и запись Spark ES также поддерживает формат JSON.
//Читать напрямую
JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
//Вложенный формат данных ES
{test={data=39.0, feature1=1.39, feature2=0.78, feature3=-0.83}}
//Выбираем формат JSON
JavaPairRDD<String, String> rdd = JavaEsSpark.esJsonRDD(sc);
//формат данных JSON
{"test":{"data":50.0,"feature1":1.5,"feature2":1.0,"feature3":-0.5}}
public class SparkWriteEs {
public static void main(String[] args) {
//Запись данных в ES в режиме RDD
SparkConf conf = new SparkConf().setAppName("my-app").setMaster("local[2]")
.set("es.nodes", "https://es-jnycbqnd.public.tencentelasticsearch.com")
.set("es.port", "9200")
.set("es.net.http.auth.user", "elastic")
.set("es.net.http.auth.pass", "passwd")
.set("es.nodes.wan.only", "true")
.set("es.resource", "spark_write/_doc")
.set("es.nodes.discovery","false")
.set("es.input.use.sliced.partitions","false")
.set("es.scroll.size","500");
JavaSparkContext sc = new JavaSparkContext(conf);
Map<String, ?> logs = ImmutableMap.of("yesyes", "255.255.255.254",
"request", "POST /write/using_spark_rdd HTTP/1.1",
"status", 200,"size", 802,
"@timestamp", 895435190);
List<Map<String, ?>> list = ImmutableList.of(logs);
JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(list);
JavaEsSpark.saveToEs(javaRDD, "spark_write/_doc");
sc.stop();
}
}
public class RealTime_Data {
public static void main(String[] args) throws Exception {
String master = "local[2]";
SparkConf conf = new SparkConf().setMaster(master).setAppName("StreamingTest")
.set("spark.es.nodes", "43.139.24.126")//Укажите адрес es
.set("spark.es.port", "9200")
.set("spark.es.nodes.wan.only","true");//Укажите порт es
//Указываем 5 секунд для получения данных Kafka
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.sparkContext().setLogLevel("WARN");
String brokers = "43.139.24.126:9092";
String groupId = "kafka";//идентификатор группы потребителей
String topics = "test";//topic
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//Получаем данные в течение 1 секунды и конвертируем их в rddstream
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
//Получаем значение в каждом сообщении
JavaDStream<String> lines = messages.map(record -> record.value());
//Запишите его в формате, который можно будет вставить в Elasticsearch
JavaDStream<String> out = lines.map(str -> "{\"test\":"+str+"}");
//Печать
out.print();
//Запись в Elasticsearch
JavaEsSparkStreaming.saveJsonToEs(out, "/spark/doc");
//Начинаем трансляцию
jssc.start();
// Подождите, пока производитель отправит данные
jssc.awaitTermination();
jssc.stop();
}
}
Вы также можете написать напрямуюESИли привести указанную структуру данныхMap<String,String>
JavaEsSparkStreaming.saveToEs(JavaDStream , "<resource>");
JavaEsSparkStreaming.saveToEsWithMeta(JavaDStream, "spark/docs", Map<String,String>());
параметр | иллюстрировать |
---|---|
es.nodes | Адрес доступа Elasticsearch |
es.port | Номер порта доступа ES 9200 |
es.net.http.auth.user | Имя пользователя ES |
es.net.http.auth.pass | Пароль пользователя ES |
es.nodes.wan.only | Следует ли выполнять прослушивание узла |
es.nodes.discovery | Отключить ли обнаружение узлов |
es.index.auto.create | Автоматически создавать индексный переключатель |
es.resource | Укажите индекс и тип для чтения и записи. |
es.mapping.names | Сопоставление имени поля таблицы и индексного поля Elasticsearch |
es.input.use.sliced.partitions | Включить ли раздел слайса |
Измените общедоступный IP-адрес в коде на внутренний IP-адрес и выберите maven. assembly pluginруководить Пакет,Загрузите пакет jar с зависимостями в EMR.,Запустите «ЧитатьES»
su - hadoop
cd /usr/local/service/spark
./bin/spark-submit --master yarn --executor-cores 1 --class "ReadES" /home/hadoop/esspark-1.0-SNAPSHOT-jar-with-dependencies.jar
Запустите «SparkWriteEs»
./bin/spark-submit --master yarn --executor-cores 1 --class "SparkWriteEs" /home/hadoop/esspark-1.0-SNAPSHOT-jar-with-dependencies.jar
Запросить данные по кибане
GET SparkWriteEs/_search
1. После загрузки проекта Пакета появилось сообщение об ошибке, что класс не найден.
Exception in thread "main" java.lang.NoClassDefFoundError: org/elasticsearch/spark/rdd/api/java/JavaEsSpark...
анализировать
Это показывает, что зависимости ESspark отсутствуют. Иллюстрировать это потому, что пакет не добавляет зависимости, что приводит к ошибкам при выполнении кода.
Решение
Используйте пакет сборки для загрузки зависимых пакетов jar.
2. Проблемы с подключением возникают при прямом доступе клиента
Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'...
анализировать:
Доступ к общедоступному сетевому адресу ES возможен напрямую.,Параметр имени пользователя и пароля заполнен.,Проблем с заполнением параметра «es.nodes.wan.only» нет, но проблема с соединением с ES все еще существует.,Индексы не создаются,Возможно, проблема связана с версией конфигурации или пакета зависимостей.
Решение
Проблема с конфигурацией параметра ES.resource, тип Правильный пример: «spark_write/_doc» не заполнен