Зачем вам нужен РДД?
Что такое РДД?
определение СДР
5 основных характеристик RDD
Функции RDD — память не требуется
СДР в WordCount
Существует два способа создания РДД в PySpark.
Параллельное создание RDD rdd1=sc.paralleise([1,2,3,4,5])
Создать RDD из файла
rdd2=sc.textFile(“hdfs://node1:9820/pydata”)
Код:
# -*- coding: utf-8 -*-
# Program функция: два способа создания RDD
'''
Первый способ: использование распараллеленных коллекций, по сути, передача локальной коллекции в качестве параметра в sc.pa.
Второй способ: используйте sc.textFile для чтения внешних файловых систем, включая hdfs и локальные файловые системы.
1. Подготовьте вход в SparkContext и подайте заявку на ресурсы.
2- Первый метод с использованием создания rdd
3-секундный метод с использованием создания rdd
4-Закрыть SparkContext
'''
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
print("=========createRDD==============")
# 1 - Подготовьте вход в SparkContext и подайте заявку на ресурсы.
conf = SparkConf().setAppName("createRDD").setMaster("local[5]")
sc = SparkContext(conf=conf)
# 2 - Первый метод, созданный с использованием rdd
collection_rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
print(collection_rdd.collect()) # [1, 2, 3, 4, 5, 6]
# 2-1 Как использовать API, чтобы получить количество Разделов в rdd
print("rdd numpartitions:{}".format(collection_rdd.getNumPartitions())) # 5
# 3 - Второй метод, созданный с использованием rdd
file_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/words.txt")
print(file_rdd.collect())
print("rdd numpartitions:{}".format(file_rdd.getNumPartitions())) # 2
# 4 - ЗакрытьSparkContext
sc.stop()
Чтение небольшого файла
Создать RDD из внешних данных
# -*- coding: utf-8 -*-
# Program функция: два способа создания RDD
'''
1. Подготовьте вход в SparkContext и подайте заявку на ресурсы.
2. Чтение внешних файлов с помощью методов sc.textFile и sc.wholeTextFile.
3-ЗакрытьSparkContext
'''
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
print("=========createRDD==============")
# 1 - Подготовьте вход в SparkContext и подайте заявку на ресурсы.
conf = SparkConf().setAppName("createRDD").setMaster("local[5]")
sc = SparkContext(conf=conf)
# 2 - Для чтения внешних файлов используйте методы sc.textFile и sc.wholeTextFile\
file_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/ratings100")
wholefile_rdd = sc.wholeTextFiles("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/ratings100")
print("file_rdd numpartitions:{}".format(file_rdd.getNumPartitions()))#file_rdd numpartitions:100
print("wholefile_rdd numpartitions:{}".format(wholefile_rdd.getNumPartitions()))#wholefile_rdd numpartitions:2
print(wholefile_rdd.take(1))# путь, конкретное значение
# Как получить Wholefile_rdd, чтобы получить конкретное значение
print(type(wholefile_rdd))#<class 'pyspark.rdd.RDD'>
print(wholefile_rdd.map(lambda x: x[1]).take(1))
# 3 - ЗакрытьSparkContext
sc.stop()
* Как просмотреть Раздел rdd? getNumPartitions()
Расширенное чтение: Как определить количество разделов RDD
# -*- coding: utf-8 -*-
# Program функция: два способа создания RDD
'''
Первый способ: использование распараллеленных коллекций, по сути, передача локальной коллекции в качестве параметра в sc.pa.
Второй способ: используйте sc.textFile для чтения внешних файловых систем, включая hdfs и локальные файловые системы.
1. Подготовьте вход в SparkContext и подайте заявку на ресурсы.
2- Первый метод с использованием создания rdd
3-секундный метод с использованием создания rdd
4-Закрыть SparkContext
'''
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
print("=========createRDD==============")
# 1 - Подготовьте вход в SparkContext и подайте заявку на ресурсы.
conf = SparkConf().setAppName("createRDD").setMaster("local[*]")
# conf.set("spark.default.parallelism",10)#Переписать степень параллелизма по умолчанию, 10
sc = SparkContext(conf=conf)
# 2 - Первый метод, созданный с использованием rdd,
collection_rdd = sc.parallelize([1, 2, 3, 4, 5, 6],5)
# 2-1 Как использовать API, чтобы получить количество Разделов в rdd
print("rdd numpartitions:{}".format(collection_rdd.getNumPartitions())) #2
# Резюме: Sparkconf устанавливает local[5] (степень параллелизма по умолчанию), sc.parallesise напрямую использует Раздел, и число равно 5.
# Если установлен spark.default.parallelism, степень параллелизма по умолчанию, sc.parallesise напрямую использует номер раздела, равный 10.
# Наивысший приоритет имеет второй параметр внутри функции. 3
# 2-2 Как распечатать содержимое каждого раздела
print("per partition content:",collection_rdd.glom().collect())
# 3 - Второй метод, созданный с использованием rdd
# minPartitions — это наименьшее количество разделов, а окончательное количество разделов основано на фактической печати.
file_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/words.txt",10)
print("rdd numpartitions:{}".format(file_rdd.getNumPartitions()))
print(" file_rdd per partition content:",file_rdd.glom().collect())
# Если sc.textFile читает несколько файлов в папке, количество Разделов здесь в основном зависит от количества файлов, и Раздел, написанный вами, не будет работать.
# file_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/ratings100", 3)
# 4 - ЗакрытьSparkContext
sc.stop()
* Сначала проясните ситуацию,Количество разделов,Здесь все основано на том, что вы видите,Особенно в sc.textFile
📢Домашняя страница блога:https://manor.blog.csdn.net
📢Лайки приветствуются 👍 собирать ⭐Оставьте сообщение 📝 Поправьте меня, если есть ошибки! 📢Эту статью написал Maynor Оригинал, впервые опубликовано на Блог CSDN🙉 📢Мне кажется,что самый ласковый и долгий взгляд в этой жизни отдан моему мобильному телефону⭐ 📢Колонка постоянно обновляется,Добро пожаловать на подписку:https://blog.csdn.net/xianyu120/category_12453356.html