PySpark существовать DataFrameReader предоставлено наcsv("path")
Воля CSV документчитать в PySpark DataFrame и сохрани или напиши CSV документиз ФункцияdataframeObj.write.csv("path")
,существовать В этой статье Юн Дуоцзюнь Воляи все узнают, как Воля.местный Оглавлениесерединаизодинокийдокумент, несколькодокумент、вседокументчитать в DataFrame, примените некоторые преобразования и, наконец, используйте PySpark Примером может быть DataFrame напиши ответ CSV документ。
PySpark Поддерживает чтение с использованием вертикальной черты, запятой, табуляции, пробела или любого другого символа-разделителя документа. CSV документ。
Уведомление: готов к использованию из коробки PySpark Поддержка будет CSV、JSON и Болеедокумент Форматиздокументчитатьприезжать PySpark DataFrame середина.
Уседатафрамеридер из csv("path")
или format("csv").load("path")
,Может ли Воля CSV-документ читаться в PySpark DataFrame?, эти методы воля принимают чтение документа в качестве параметра. Когда использовать format("csv")
методчас,Также можно указать источник данных по его полному имени.,Но для встроенных источников,может простоиспользоватьониизкороткое имя(csv
、json
、parquet
、jdbc
、text
ждать).
Видеть GitHub начальствоизданныенаборzipcodes.csv。
Портал: https://github.com/spark-examples/pyspark-examples/blob/master/resources/zipcodes.csv)
spark = SparkSession.builder().master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
df = spark.read.csv("/tmp/resources/zipcodes.csv")
df.printSchema()
использовать полное изданное имя источника, вы также можете сделать следующее.
df = spark.read.format("csv")
.load("/tmp/resources/zipcodes.csv")
# или
df = spark.read.format("org.apache.spark.sql.csv")
.load("/tmp/resources/zipcodes.csv")
df.printSchema()
этот Примером может бедныечитатьto DataFrame Список"_c0"
середина,для первого столбцаи"_c1"
второй столбец,И так далее. По умолчанию,Все эти типы столбцов рассматриваются как строки.
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
Если входной документ имеет имя столбца из заголовка, вам необходимо указать заголовок Параметры использовать явно, не упоминая об этом. option("header", True)
,API Думайте о заголовках как о записях данных.
вам нужноиспользоватьoption("header", True)
Явнодля"header"
ПараметрыобозначениедляTrue
,Если не установлено,тогда по умолчанию Воля "header"
Название как запись данных.
df2 = spark.read.option("header",True) \
.csv("/tmp/resources/zipcodes.csv")
# df2 = spark.read.csv("/tmp/resources/zipcodes.csv",header=True)
Как упоминалось ранее, PySpark По умолчанию Волявсе столбцы читать для строки (StringType). Я Волясуществовать назад, чтобы узнать, как от записи заголовка читать. schema
(inferschema) и на основеданныепроизводнаяinferschema
Списоктип。
использоватьread.csv()
Метод в порядке Прочитать несколько csv документ,Просто передайте все имена документов через запятую для путей.,Например:
df = spark.read.csv("path1,path2,path3")
Только Воля Оглавлениеделатьдляcsv()
методизпуть, пройденный кметод,Мы можем воля Оглавление из всех CSV-документов прочитать в DataFrame середина.
df = spark.read.csv("Folder path")
PySpark Обеспечивает разнообразную обработку данных CSV-документаSETИз параметров. Ниже приведены некоторые из наиболее важных параметров, объясненные на примерах.
Ссылки можно использовать option(self, key, value)
Приходитьиспользовать Несколько Параметры。Долженметод Есть альтернативаметод:options(self, **options)
,Эффект тот же.
Параметры delimiter
используется для указания CSV документировать разделитель столбцов. По умолчанию это запятая(,) характер。Можетиспользоватьэтот Параметры Воляего настройкидлялюбойхарактер,НапримерТруба (|), вкладка (\t), пробел. Все это должно быть основано на реальности. CSV dataSET документиз конкретной настройки формы.
df3 = spark.read.options(delimiter=',') \
.csv("C:/PyDataStudio/zipcodes.csv")
Это значение параметра по умолчанию установлено для False
,Настройки True
, Spark автоматически определит тип столбца на основе данных.
df4 = spark.read.options(inferSchema='True',
delimiter=',') \
.csv("PyDataStudio/zipcodes.csv")
или,также Можетчтобы передать ссылкуoption()
метод Приходитьписатьэто。
df4 = spark.read.option("inferSchema",True) \
.option("delimiter",",") \
.csv("PyDataStudio/zipcodes.csv")
Используйте этот параметр CSV Первая строка документа — для. По умолчанию это значение параметра False
, и все типы столбцов считаются строками.
df5 = spark.read.options(header='True',
inferSchema='True',
delimiter=',') \
.csv("PyDataStudio/zipcodes.csv")
использовать, когда есть столбец с разделителем, используемым для разделения столбцов quotes
Параметрыобозначениекавычкихарактер,По умолчанию это''
,А разделитель Воля внутри кавычек игнорируется. Но эти параметры,Можно установить любой символ.
использовать nullValues
Параметры,Вы можете указать пустое значение в строке в формате CSV. Например,если Воля"1900-01-01"
существовать DataFrame Установите значение null столбец «Дата».
Параметры dateFormat
для настройки входа DateType
и TimestampType
Столбец в формате из параметров. Поддержать всех java.text.SimpleDateFormat
Формат.
Уведомление: В дополнение к вышеуказанным параметрам PySpark CSV API также поддерживает множество других параметров.,Вы можете проверить официальную документацию PySpark.
еслизнать заранеедокументиз Архитектураи Не хочуиспользоватьinferSchema
Параметры Приходитьобозначение Списокимяитип,пожалуйстаиспользоватьобозначениеиз Настроить Списокимяschemaииспользоватьschema
Параметрытип。
schema = StructType() \
.add("RecordNumber",IntegerType(),True) \
.add("Zipcode",IntegerType(),True) \
.add("ZipCodeType",StringType(),True) \
.add("City",StringType(),True) \
.add("State",StringType(),True) \
.add("LocationType",StringType(),True) \
.add("Lat",DoubleType(),True) \
.add("Long",DoubleType(),True) \
.add("Xaxis",IntegerType(),True) \
.add("Yaxis",DoubleType(),True) \
.add("Zaxis",DoubleType(),True) \
.add("WorldRegion",StringType(),True) \
.add("Country",StringType(),True) \
.add("LocationText",StringType(),True) \
.add("Location",StringType(),True) \
.add("Decommisioned",BooleanType(),True) \
.add("TaxReturnsFiled",StringType(),True) \
.add("EstimatedPopulation",IntegerType(),True) \
.add("TotalWages",IntegerType(),True) \
.add("Notes",StringType(),True)
df_with_schema = spark.read.format("csv") \
.option("header", True) \
.schema(schema) \
.load("/PyDataStudio/zipcodes.csv")
от CSV документсоздавать DataFrame После этого вы можете подать заявку DataFrame поддерживатьизвсе Конвертироватьи Держатьделать。
использоватьPySpark DataFrameWriter объектизwrite()
метод Воля PySpark DataFrame писать CSV документ。
df.write.option("header",True) \
.csv("/PyDataStudio/spark_output/zipcodes")
существоватьписать CSV В документе вы можете использовать несколько параметров. Например, установите header
для True
Воля DataFrame Имена столбцов используются в качестве заголовков для записи вывода и использования. delimiter
существовать CSV Указанный разделитель в выходном документе.
df2.write.options(header='True',
delimiter=',') \
.csv("/PyDataStudio/spark_output/zipcodes")
Другие доступные параметры quote
, escape
, nullValue
, dateFormat
, quoteMode
。специфический Можетпросмотреть официальную документацию。
PySpark DataFrameWriter Есть еще один mode()
Метод для указания режима сохранения.
overwrite
– Шаблон используется для перезаписи существующего документа.append
– Воляданные добавлены к существующему документу.ignore
– Игнорировать пишет, когда документ уже существует.error
– Это параметр по умолчанию, который возвращает ошибку, если документ уже существует.df2.write.mode('overwrite') \
.csv("/PyDataStudio/spark_output/zipcodes")
# Вы также можете написать так
df2.write.format("csv") \
.mode('overwrite') \
.save("/PyDataStudio/spark_output/zipcodes")
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.read.csv("/PyDataStudio/zipcodes.csv")
df.printSchema()
df2 = spark.read.option("header",True) \
.csv("/PyDataStudio/zipcodes.csv")
df2.printSchema()
df3 = spark.read.options(header='True', delimiter=',') \
.csv("/PyDataStudio/zipcodes.csv")
df3.printSchema()
schema = StructType() \
.add("RecordNumber",IntegerType(),True) \
.add("Zipcode",IntegerType(),True) \
.add("ZipCodeType",StringType(),True) \
.add("City",StringType(),True) \
.add("State",StringType(),True) \
.add("LocationType",StringType(),True) \
.add("Lat",DoubleType(),True) \
.add("Long",DoubleType(),True) \
.add("Xaxis",IntegerType(),True) \
.add("Yaxis",DoubleType(),True) \
.add("Zaxis",DoubleType(),True) \
.add("WorldRegion",StringType(),True) \
.add("Country",StringType(),True) \
.add("LocationText",StringType(),True) \
.add("Location",StringType(),True) \
.add("Decommisioned",BooleanType(),True) \
.add("TaxReturnsFiled",StringType(),True) \
.add("EstimatedPopulation",IntegerType(),True) \
.add("TotalWages",IntegerType(),True) \
.add("Notes",StringType(),True)
df_with_schema = spark.read.format("csv") \
.option("header", True) \
.schema(schema) \
.load("/PyDataStudio/zipcodes.csv")
df_with_schema.printSchema()
df2.write.option("header",True) \
.csv("/PyDataStudio/spark_output/zipcodes123")