spark-submit --files обычно используется для загрузки внешних файлов ресурсов и доступа к ним в процессах драйвера и исполнителя.
–files и –jars по сути одинаковы
spark-submit --files file_paths
Среди них file_paths может быть разными способами: file: | hdfs:// | http:// | ftp:// | local:(Разделяйте несколько путей запятыми.
)
spark-submit \
--master yarn \
--deploy-mode cluster \
--principal xxx.com \
--keytab /xxx/keytabs/xxx.keytab \
--driver-java-options "-Dspring.profiles.active=prod -Dorg.springframework.boot.logging.LoggingSystem=none -Djava.ext.dirs=/xxx/CDH-x.x.x-1.cdhx.x.x.p0.xxx/lib/spark/jars/gson-2.8.1.jar,/xxx/CDH-x.x.x-1.cdhx.1.1.p0.xxx/lib/hive/lib/* -Dspark.yarn.dist.files=/xxx/CDH-x.1.1-1.cdhx.1.1.p0.xxx/etc/hadoop/conf.dist/yarn-site.xml" \
--driver-class-path "/xxx/CDH-x.x.x-x.cdhx.x.x.p1000.xxx/jars/gson-2.8.1.jar:$JAVA_HOME/jre/lib/ext/*:/xxx/CDH-x.x.x-1.cdhx.x.x.p1000.xxx/jars/commons-cli-1.4.jar" \
--driver-cores $dc \
--driver-memory $dm \
--num-executors $ne \
--executor-cores $ec \
--executor-memory $em \
--conf spark.sql.crossJoin.enabled=true \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.driver.maxResultSize=20G \
--conf spark.sql.shuffle.partitions=$partitions \
--conf spark.default.parallelism=$partitions \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128000000 \
--conf spark.yarn.queue=xxx \
--conf spark.shuffle.io.maxRetries=200 \
--conf spark.shuffle.io.retryWait=30 \
--conf spark.port.maxRetries=120 \
--conf spark.core.connection.ack.wait.timeout=6000 \
--conf spark.shuffle.sort.bypassMergeThreshold=300 \
--conf spark.hadoop.hive.exec.dynamic.partition=$dp \
--conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \
--conf spark.hadoop.hive.exec.max.dynamic.partitions=2000 \
--name $name \
--files "$files" \ #/путь/локальный файл сервера
--class xxxApplication /xxx/xxx-1.0-SNAPSHOT.jar -jn $obj -sq "$sql" -ptby $ptby
//If you add your external files using "spark-submit --files" your files will be uploaded to this HDFS folder: hdfs://your-cluster/user/your-user/.sparkStaging/application_1449220589084_0508
//application_1449220589084_0508 is an example of yarn application ID!
//1. find the spark staging directory by below code: (but you need to have the hdfs uri and your username)
System.getenv("SPARK_YARN_STAGING_DIR"); --> .sparkStaging/application_1449220589084_0508
//2. find the complete comma separated file paths by using:
System.getenv("SPARK_YARN_CACHE_FILES"); -->
hdfs://yourcluster/user/hdfs/.sparkStaging/application_1449220589084_0508/spark-assembly-1.4.1.2.3.2.0-2950-hadoop2.7.1.2.3.2.0-2950.jar#__spark__.jar,
hdfs://yourcluster/user/hdfs/.sparkStaging/application_1449220589084_0508/your-spark-job.jar#__app__.jar,
hdfs://yourcluster/user/hdfs/.sparkStaging/application_1449220589084_0508/test_file.txt#test_file.txt
--files загрузит файлы в каталог .sparkStagin/applicationId hdfs.
spark.read().textFile(System.getenv("SPARK_YARN_STAGING_DIR") + "/xxx.xxx")
textFile Если вы не указали hdfs, file или другие префиксы, по умолчанию используется относительный путь в hdfs://yourcluster/user/your_username.
SparkFiles.get(fileName)
Применяется к локальному режиму
JavaRDD<String> stringJavaRDD = sparkcontext.textFile(SparkFiles.get(fileName));
List<String> collect = stringJavaRDD.collect();
[Примечания]
В режиме кластера (-- Deploy-mode Cluster) -- файлы должны использовать глобально видимый адрес (например, hdfs), иначе драйвер не сможет найти файл и возникнет исключение FileNotFoundException. Это связано с тем, что драйвер будет работать на любом рабочем узле кластера, и файл невозможно найти по локальному адресу. Исключение FileNotFoundException возникает в методе инициализации getOrCreate() SparkSession, поскольку этот метод вызывает addFile(), но файл не может быть найден, что приводит к сбою инициализации SparkSession. Примечание. Принцип –jars тот же, но при вызове addJars в getOrCreate() возникает исключение, но это не приведет к сбою инициализации SparkSession, и программа продолжит работу.
Стоит отметить, что в режиме кластера, spark-submit --deploy-mode class path-to-jar, где path-to-jar также должен быть глобальным видимым путем, в противном случае произойдет исключение, что jar не будет найден.
FileInputStream sqlstream = new FileInputStream(fileName);
StringBuilder sqlContent = new StringBuilder();
Scanner scanner = new Scanner(sqlstream, "UTF-8");
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
sqlContent.append(line).append("\n");
}
Применимо к местной и пряжи client、yarn clusterмодель
,
Properties properties = new Properties();
properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("test.properties"));
Подходит для режимов пряжи и кластера.