В этой статье Юн Дуоджун научится вместе с вами писать файлы Parquet из PySpark DataFrame, считывать файлы Parquet в DataFrame и создавать представления/таблицы для выполнения SQL-запросов. Также узнайте, как секционировать данные из файлов Parquet и получать секции с помощью SQL для повышения производительности.
Pyspark SQL при условии, что Parquet Чтение файлов DataFrame Казусё DataFrame писать Parquet документ,DataFrameReader
иDataFrameWriter
Правильный методparquet()
соответственно для чтенияиписать/создавать Parquet документ. Паркет Файл сохраняет схему вместе с данными, поэтому он используется для обработки структурированных файлов.
Вот как PySpark Читать в написании Parquet Простое описание файла, которое я подробно объясню в следующем разделе.
df.write.parquet("/tmp/out/people.parquet")
parDF1=spark.read.parquet("/temp/out/people.parquet")
Я подробно объяснил это раньше. такое файл паркета и ее родственник CSV、JSON и другие преимущества форматов текстовых файлов.
Apache Parquet
файл представляет собой столбчатый формат хранения, подходящий для Hadoop Любой проект в экосистеме, независимо от выбранной платформы обработки данных, модели данных или языка программирования.
https://parquet.apache.org/
При запросе к колоночному хранилищу ненужные данные очень быстро пропускаются, что приводит к более быстрому выполнению запроса. Таким образом, агрегированные запросы требуют меньше времени по сравнению с базами данных, ориентированными на строки.
Parquet поддерживает расширенные вложенные структуры данных и поддерживает эффективные параметры сжатия и схемы кодирования.
Pyspark SQL Поддержите чтение написать Parquet файл автоматически фиксирует схему необработанных данных, а также снижает среднее значение 75% хранения данных. Писпарк Поддерживается по умолчанию в своей библиотеке Parquet, поэтому нам не нужно добавлять какие-либо зависимые библиотеки.
так как у нас нет Parquet файл, с которого мы начинаем DataFrame писать Паркет. Во-первых, как использовать spark.createDataFrame()
Создать из списка данных Pyspark DataFrame。
data =[("James ","","Smith","36636","M",3000),
("Michael ","Rose","","40288","M",4000),
("Robert ","","Williams","42114","M",4000),
("Maria ","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)]
columns=["firstname", "middlename",
"lastname", "dob",
"gender", "salary"]
df=spark.createDataFrame(data,columns)
В приведенном выше примере создается DataFrame, содержащий firstname、middlename、lastname、dob、gender、salary
Список.
сейчассуществовать ПозвонивDataFrameWriterдобрыйизparquet()
функция отPySpark DataFrameсоздавать паркетный документ. Когда ВоляDataFrame пишет паркетный документ, он автоматически сохраняет имена столбцов и их типы данных. Pysparkсоздаватькаждый раздел имеет документ .parquet
Расширение файла.
df.write.parquet("/PyDataStudio/output/people.parquet")
Pyspark существовать DataFrameReader добрыйсерединапоставлятьполучил одинparquet()
метод Воля Parquet Чтение файлов кадр данных. Ниже представлено завещание Parquet Файл, прочитанный в dataframe пример.
parDF=spark.read.parquet("/PyDataStudio/output/people.parquet")
использовать append
Режим сохранения «Добавить» позволяет добавить фрейм данных к существующему. Parquet документсередина.Чтобы перезаписатьиспользовать overwrite
Перезаписать режим сохранения.
df.write.mode('append') \
.parquet("/PyDataStudio/output/people.parquet")
df.write.mode('overwrite') \
.parquet("/PyDataStudio/output/people.parquet")
Pyspark Sql поставлятьсуществовать Parquet Создайте временное представление файла для выполнения. sql Запрос。существоватьтыиз Хранение программсуществовать До,Эти представления доступны.
parqDF.createOrReplaceTempView("ParquetTable")
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")
сейчассуществовать Приходите и посмотритесуществовать Parquet Выполнить в файле SQL Запрос. чтобы выполнить sql Запросы, мы не следуем DataFrame серединасоздавать,Но прямосуществовать parquet Создайте временное представление или таблицу в файле.
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/PyDataStudio/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()
существоватьздесь мы начнем с people.parquet
файл создает временное представление PERSON
. Это дает следующие результаты.
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Robert | |Williams|42114| M| 4000|
| Maria | Anne| Jones|39192| F| 4000|
| Michael | Rose| |40288| M| 4000|
| James | | Smith|36636| M| 3000|
| Jen| Mary| Brown| | F| -1|
+---------+----------+--------+-----+------+------+
когда мы PERSON
Когда таблица изучает конкретный Запрос, она сканирует все строки и возвращает результаты. Это похоже на традиционные изданные библиотеки Запрососуществлять. существовать PySpark , мы можем использоватьиспользовать PySpark partitionBy()
пара методовданныераздел,Улучшите Запрососуществлять оптимизированным способом.
df.write.partitionBy("gender","salary") \
.mode("overwrite") \
.parquet("/PyDataStudio/output/people2.parquet")
при проверке people2.parquet
файл, он имеет два раздела gender
и salary
。
В следующем примере объясняется разделение Parquet Файл, прочитанный в gender=M
из DataFrame середина.
parDF2=spark.read.parquet("/PyDataStudio/output/people2.parquet/gender=M")
parDF2.show(truncate=False)
Вышеупомянутый результат Примера показан ниже.
+---------+----------+--------+-----+------+
|firstname|middlename|lastname|dob |salary|
+---------+----------+--------+-----+------+
|Robert | |Williams|42114|4000 |
|Michael |Rose | |40288|4000 |
|James | |Smith |36636|3000 |
+---------+----------+--------+-----+------+
существоватьздесь,Усуществовать зону паркетной доски начальствосоздавать этот стол,И быстрее изучить один, чем никакой раздел из таблицы, изучить из Запрос,Это улучшает производительность.
spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"/PyDataStudio/output/people2.parquet/gender=F\")")
spark.sql("SELECT * FROM PERSON2" ).show()
Вышеупомянутый результат Примера показан ниже.
+---------+----------+--------+-----+------+
|firstname|middlename|lastname| dob|salary|
+---------+----------+--------+-----+------+
| Maria | Anne| Jones|39192| 4000|
| Jen| Mary| Brown| | -1|
+---------+----------+--------+-----+------+
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("parquetFile").getOrCreate()
data =[("James ","","Smith","36636","M",3000),
("Michael ","Rose","","40288","M",4000),
("Robert ","","Williams","42114","M",4000),
("Maria ","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)]
columns=["firstname","middlename","lastname","dob","gender","salary"]
df=spark.createDataFrame(data,columns)
df.write.mode("overwrite").parquet("/PyDataStudio/output/people.parquet")
parDF1=spark.read.parquet("/PyDataStudio/output/people.parquet")
parDF1.createOrReplaceTempView("parquetTable")
parDF1.printSchema()
parDF1.show(truncate=False)
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")
parkSQL.show(truncate=False)
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/PyDataStudio/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()
df.write.partitionBy("gender","salary").mode("overwrite").parquet("/PyDataStudio/output/people2.parquet")
parDF2=spark.read.parquet("/PyDataStudio/output/people2.parquet/gender=M")
parDF2.show(truncate=False)
spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"/PyDataStudio/output/people2.parquet/gender=F\")")
spark.sql("SELECT * FROM PERSON2" ).show()