Прежде чем читать эту статью, необходимо прочитать:
1. Краткое изложение основных концепций и архитектуры «серии Apache Hudi».
2. Руководство по началу работы с Apache Hudi серии Hudi | Интеграция SparkSQL+Hive+Presto |
3. Выпущена версия Apache Hudi 0.11, краткий обзор новых функций!
Apache Hudi (произносится как «Hudi») предоставляет следующие примитивы потоковой передачи в наборе данных DFS.
В этом разделе мы обсуждаем важные концепции и терминологию, которые помогают понять и эффективно использовать эти примитивы.
По своей сути Hudi ведет список различных мгновенных событий. времяграфик всех операций над набором данных времени,тем самым обеспечивая,Наборы данных при разных обстоятельствах получены в разные моменты времени. Hudi Instant содержит следующие компоненты
Гарантия Hudi на графике Атомарность и мгновенность операций, выполняемых во времени. времяизграфик постоянство времени.
Ключевые выполняемые операции включают в себя
В любой момент времени может находиться в одном из следующих состояний
В приведенном выше примере показаны события обновления, которые произошли примерно между 10:00 и 10:20 в наборе данных Hudi.,Примерно каждые 5 минут,Храните метаданные коммитов и другую фоновую очистку/сжатие на Hudiграфик времени.
Ключевой момент, на который следует обратить внимание: время фиксации указывает время прибытия данных (10:20 утра), тогда как фактическая организация данных отражает фактическое время или время события, которое отражают данные (часовой период, начинающийся в 07:00). . Это два ключевых понятия при оценке задержки и целостности данных.
Если данные приходят с опозданием (данные с временем события 9:00, поступающие в 10:20, задержка >1 часов), мы видим, что upsert генерирует новые данные в более старые периоды времени/папки.
С помощью графика времени,Инкрементальный запрос может извлекать только новые данные, успешно отправленные после 10:00.,и очень эффективно использовать только измененные файлы,и нет необходимости сканировать большие диапазоны файлов,Например, все периоды времени после 07:00.
Hudi организует наборы данных в DFS в структуру каталогов по базовому пути. Набор данных разделен на разделы, которые представляют собой папки, содержащие файлы данных для этого раздела, подобно таблице Hive.
Каждый раздел отличается определенным путем к разделу относительно базового пути.
Внутри каждого раздела файлы организованы в группы файлов, уникально идентифицируемые идентификаторами файлов.
Каждая файловая группа содержит несколько фрагментов файлов, причем каждый фрагмент содержит файл базового столбца (*.parquet), созданный в определенный момент фиксации/сжатия, и набор файлов журналов (*.log*), содержащих самостоятельно созданную вставку/обновление для базовый файл, поскольку базовый файл.
Hudi использует дизайн MVCC, при котором операция сжатия объединяет файлы журнала и базовые файлы для создания новых фрагментов файлов, а операция очистки удаляет неиспользуемые/старые фрагменты файлов, чтобы освободить место в DFS.
Hudi обеспечивает эффективную Upsert, сопоставляя заданный ключ с капюшоном (ключ записи + путь к разделу) группе файлов с помощью механизма индексирования.
После того как первая версия записи записана в файл, сопоставление между ключами записи и файловой группой/идентификатором файла никогда не меняется. Проще говоря, сопоставленная файловая группа содержит все версии набора записей.
Hudiтип хранилище определяет, как данные индексируются и размещаются в DFS и как вышеуказанные примитивы и график реализуются поверх этой организации. время активности (т.е. как записывать данные).
Представления, в свою очередь, определяют, как базовые данные подвергаются запросам (т. е. как данные читаются).
тип хранилища
HudiПоддержите следующеетип хранилища。
В таблице ниже приведены компромиссы между двумя типами типов хранения.
вид
Hudi поддерживает следующие способы хранения данных:
В таблице ниже приведены компромиссы между различными вариантами.
Части файлов в хранилище с обнаружением при записи содержат только файлы базы/столбца, и каждая фиксация создает новую версию базового файла.
Другими словами, мы сжимаем каждый коммит, чтобы все данные сохранялись в виде данных столбца. В этом случае запись данных обходится очень дорого (нам нужно перезаписать весь файл данных столбца, даже если зафиксирован только один байт новых данных), при этом стоимость чтения данных не увеличивается.
Этот вид облегчает чтение тяжелой аналитической работы.
Ниже объясняется, как это работает, когда вы записываете данные в хранилище с сохранением при записи и выполняете к ним два запроса.
По мере записи данных обновления существующей файловой группы будут генерировать новый срез для этой файловой группы с отметкой времени немедленной фиксации, а вставки выделяют новую файловую группу и записывают первый срез этой файловой группы.
Эти фрагменты файлов и время их фиксации отмечены цветом выше.
Запустите SQL-запросы к такому набору данных (например: выберите count(*) подсчитывает количество записей в разделе), сначала проверьте график времени на последние коммиты и фильтровать все файлы, кроме последних, в каждой файловой группе.
Как видите, старые запросы не увидят файлы текущего коммита, отмеченные розовым цветом, но новые запросы после этого коммита получат новые данные. Таким образом, на запрос не влияют сбои записи или частичная запись, и он выполняется только на зафиксированных данных.
Целью хранилища с сохраненными копиями при записи является фундаментальное улучшение текущего способа управления наборами данных с помощью следующих методов.
Объединение хранилища при «копчтения» — это обновленная версия слова «копировать» на момент его написания, в том смысле, что оно все еще может пройти Читать оптимизированную Таблица обеспечивает вид оптимизации во время чтения (функциональность копирования во время записи) наборов данных.
также,Он сохраняет вставки обновлений для каждой файловой группы в дельта-журнал на основе строк.,по идентификатору файла,Объединить инкрементальные журналы с последней версией базового файла.,тем самым обеспечивая Близко к реальностичасизданные Запрос。поэтому,Этот тип хранилища разумно балансирует затраты на чтение и запись.,для предоставления запросов практически в реальном времени.
Самым важным здесь является компрессор, который теперь тщательно выбирает, какие дельта-журналы необходимо сжать в столбчатый базовый файл (в зависимости от размера файла дельта-журнала) для поддержания производительности запросов (большие дельта-журналы будут улучшаться практически в реальном времени). время запроса и требует более длительного времени слияния).
Ниже объясняется, как работает хранилище, и показаны запросы к таблицам, работающим практически в реальном времени и оптимизированным для чтения.
В этом примере происходит много интересных вещей, которые раскрывают тонкости метода.
Объединение хранилища при Цель чтения состоит в том, чтобы обеспечить обработку практически в реальном времени непосредственно в DFS, а не отправлять данные для копирования в выделенную систему, которая может быть не в состоянии обрабатывать большие объемы данных. данныеколичество。
Это хранилище также имеет некоторые другие преимущества, такие как уменьшение усиления записи, которое представляет собой объем данных, записываемых на 1 байт данных в пакете, за счет предотвращения синхронного слияния данных.
Перед этим разберитесь с источником данных Hudi и разницей. Три различных операции, обеспечиваемые инструментом косы. записи и способы их наилучшего использования могут оказаться полезными. Эти операции можно выбрать/изменить при каждой фиксации/дельта-фиксации, выполненной для набора данных.
Утилита HoodieDeltaStreamer (часть hudi-utilities-bundle) обеспечивает возможность приема данных из различных источников, таких как DFS или Kafka, и имеет следующие функции.
Параметры командной строки описывают эти функции более подробно:
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` --help
Usage: <main class> [options]
Options:
--commit-on-errors
Commit even when some records failed to be written
Default: false
--enable-hive-sync
Enable syncing to hive
Default: false
--filter-dupes
Should duplicate records from source be dropped/filtered outbefore
insert/bulk-insert
Default: false
--help, -h
--hudi-conf
Any configuration that can be set in the properties file (using the CLI
parameter "--propsFilePath") can also be passed command line using this
parameter
Default: []
--op
Takes one of these values : UPSERT (default), INSERT (use when input is
purely new data/inserts to gain speed)
Default: UPSERT
Possible Values: [UPSERT, INSERT, BULK_INSERT]
--payload-class
subclass of HoodieRecordPayload, that works off a GenericRecord.
Implement your own, if you want to do something other than overwriting
existing value
Default: org.apache.hudi.OverwriteWithLatestAvroPayload
--props
path to properties file on localfs or dfs, with configurations for
Hudi client, schema provider, key generator and data source. For
Hudi client props, sane defaults are used, but recommend use to
provide basic things like metrics endpoints, hive configs etc. For
sources, referto individual classes, for supported properties.
Default: file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties
--schemaprovider-class
subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach
schemas to input & target table data, built in options:
FilebasedSchemaProvider
Default: org.apache.hudi.utilities.schema.FilebasedSchemaProvider
--source-class
Subclass of org.apache.hudi.utilities.sources to read data. Built-in
options: org.apache.hudi.utilities.sources.{JsonDFSSource (default),
AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}
Default: org.apache.hudi.utilities.sources.JsonDFSSource
--source-limit
Maximum amount of data to read from source. Default: No limit For e.g:
DFSSource => max bytes to read, KafkaSource => max events to read
Default: 9223372036854775807
--source-ordering-field
Field within source record to decide how to break ties between records
with same key in input data. Default: 'ts' holding unix timestamp of
record
Default: ts
--spark-master
spark master to use.
Default: local[2]
* --target-base-path
base path for the target Hudi dataset. (Will be created if did not
exist first time around. If exists, expected to be a Hudi dataset)
* --target-table
name of the target table in Hive
--transformer-class
subclass of org.apache.hudi.utilities.transform.Transformer. UDF to
transform raw source dataset to a target dataset (conforming to target
schema) before writing. Default : Not set. E:g -
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which
allows a SQL query template to be passed as a transformation function)
Инструмент использует иерархическую структуру файлов свойств с подключаемыми интерфейсами для извлечения данных, генерации ключей и предоставления схем.
отKafkaиDFS摄取данныеиз Пример Конфигурацияздесь:hudi-utilities/src/test/resources/delta-streamer-config
。
Например: после того, как у вас запущен реестр Confluent Kafka и Schema, вы можете использовать эту команду для создания некоторых тестовых данных (impressions.avro, предоставляемых библиотекой кода реестра схемы).
[confluent-5.0.0]$ bin/ksql-datagen schema=../impressions.avro format=avro topic=impressions key=impressionid
Затем используйте следующую команду для приема этих данных.
[hoodie]$ spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--target-base-path file:///tmp/hudi-deltastreamer-op --target-table uber.impressions \
--op BULK_INSERT
В некоторых случаях вам может потребоваться заранее перенести существующие наборы данных в Hudi. Пожалуйста, обратитесь к руководству по миграции.
hudi-spark
Модуль обеспечиваетDataSource API, который может записывать (а также читать) любой фрейм данных в набор данных Hudi. Вот как вставить после указания названия полей, которые нужно использовать обновлениеданные帧изметод,Эти поля включают в себяrecordKey => _row_key、partitionPath => раздел и precombineKey => timestamp
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) // Можно передать любые параметры клиента Hudi.
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);
Оба вышеуказанных инструмента поддерживают синхронизацию последней схемы набора данных с Hive. Metastore,для запроса новых столбцов и разделов. Если вам нужно запустить его из командной строки или в автономной JVM,HudiпредоставилHiveSyncTool
,После сборки модуля hudi-hive,Его можно назвать следующим образом.
cd hudi-hive
./run_sync_tool.sh
[hudi-hive]$ ./run_sync_tool.sh --help
Usage: <main class> [options]
Options:
* --base-path
Basepath of Hudi dataset to sync
* --database
name of the target database in Hive
--help, -h
Default: false
* --jdbc-url
Hive jdbc connect url
* --pass
Hive password
* --table
name of the target table in Hive
* --user
Hive username
Hudi поддерживает два типа удаления данных, хранящихся в наборах данных Hudi, позволяя пользователям указывать различные реализации полезной нагрузки записей данных.
org.apache.hudi.EmptyHoodieRecordPayload
добрый,Он достигает этой функции.deleteDF // Кадр данных, содержащий только записи, которые нужно удалить
.write().format("org.apache.hudi")
.option(...) // Добавьте параметры HUDI, необходимые для настройки, такие как ключи записи, пути к разделам и другие параметры.
// Укажите Record_key, раздел_ключ, precombine_fieldkey и общие параметры.
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
Hudi также выполняет несколько ключевых операций управления данными, хранящимися в наборах данных Hudi. функциям хранилищам. Ключевыми аспектами хранения данных в DFS являются управление размером и количеством файлов, а также освобождение места для хранения. Например, HDFS имеет низкую производительность при обработке небольших файлов, что повлияет на имя. Память узла и RPC оказывают большое давление и могут дестабилизировать весь кластер. Механизмы запросов обычно обеспечивают более высокую производительность при работе с большими файлами столбцов, поскольку они могут эффективно амортизировать затраты на получение статистики по столбцам и т. д. Даже в некоторых облачных хранилищах данных список каталогов с большим количеством небольших файлов часто выполняется медленно.
Вот несколько способов эффективного управления хранилищем наборов данных Hudi.
концептуально,Hudi физически хранит данные один раз в DFS.,При этом на нем предусмотрено три логики,Как упоминалось ранее.
После синхронизации набора данных с Hive Metastore он предоставит внешние таблицы Hive, поддерживаемые пользовательским форматом ввода Hudi. После предоставления соответствующего пакета Hudi набор данных можно будет запрашивать через распространенные механизмы запросов, такие как Hive, Spark и Presto.
В частности, в процессе записи передаются две таблицы Hive, названные по именам таблиц. Например, если имя таблицы = hudi_tbl, мы получим
Как упоминалось в разделе «Концепции», ключевой примитив, необходимый для инкрементальной обработки, является постепенным. притяжение (чтобы получить поток/лог изменений из набора данных). Вы можете постепенно извлекать наборы данных Hudi. Это означает, что вы можете извлекать наборы данных Hudi из Начиная со времени, вы можете получать только все обновления и новые строки. Это то же самое, что вставить обновлениеиспользуются вместе,для особенно полезен для построения определенных конвейеров данных.,Включает в себя постепенное извлечение одной или нескольких исходных таблиц Hudi (потоки данных/факты) (потоки/факты) и объединение с другими таблицами (наборы данных/измерения) для записи разницы в целевой набор данных Hudi. Вид приращения достигается путем запроса одной из приведенных выше таблиц.,И иметь специальную конфигурацию,Эта специальная конфигурация указывает, что плану запроса необходимо получать только дополнительные данные из набора данных.
Далее мы подробно обсудим, как получить доступ ко всем трем видам в каждой системе запросов.
Чтобы Hive распознал набор данных Hudi и правильно запросил его,HiveServer2нужна помощьjarsпредоставлено в путиhudi-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar
。Это обеспечит Формат вводадобрыйи его зависимостиэлемент Доступно для Запроспланиосуществлять。
Прочитать оптимизированную таблицу {#hive-ro-view}
Помимо вышеперечисленных настроек, для билайн доступ к командной строке,Также необходимоhive.input.format
Изменятьколичествоустановлен наorg.apache.hudi.hadoop.HoodieParquetInputFormat
Формат вводаизполностью квалифицированныйпутьимя。дляTez,Также необходимоhive.tez.input.format
установлен наorg.apache.hadoop.hive.ql.io.HiveInputFormat
。
Таблица реального времени {#hive-rt-view}
Помимо установки связанных JAR-файлов Hive на HiveServer2, вам также необходимо поместить их в установку Hadoop/hive для всего кластера, чтобы запросы также могли использовать пользовательский RecordReader.
Инкрементальное извлечение {#hive-incr-pull}
HiveIncrementalPuller
разрешено пройтиHiveQLотважные факты/Таблица размеровсерединаувеличиватьколичество Извлечь изменения, Сочетает в себе преимущества Hive (для надежной обработки сложных SQL-запросов) и инкрементных примитивов (для ускорения запросов с помощью инкрементных запросов вместо полного сканирования). Этот инструмент использует Hive JDBC запускает запрос куста и сохраняет его результаты во временной таблице. Эту таблицу можно использовать вставить. обновление。Upsertполезность(HoodieDeltaStreamer
)具有目录结构необходимыйизвсесостояние,以了解目标表начальствоизпредставлять на рассмотрениечасмежду应为多少。Например:/app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}. Формат зарегистрированной таблицы Delta Hive – {tmpdb}.{source_table}_{last_commit_included}.
。
Ниже приведены параметры конфигурации HiveIncrementalPuller.
| Конфигурация | описывать | значение по умолчанию | |hiveUrl| Улей для подключения Server 2 URL-адреса | | |hiveUser| Hive Server 2 имя пользователя | | |hivePass| Hive Server 2 пароль | | |queue| YARN имя очереди | | |tmp| Каталог в DFS, в котором хранятся временные добавочные данные. Структура каталогов будет соответствовать соглашению. См. раздел ниже. | | |extractSQLFile| SQL, который необходимо выполнить в исходной таблице для извлечения данных. Извлеченные данные будут представлять собой все строки, которые изменились с определенного момента времени. | | |sourceTable| Имя исходной таблицы. Его необходимо установить в свойствах среды Hive. | | |targetTable| Имя целевой таблицы. Требуется структура каталогов промежуточного хранилища. | | |sourceDataPath| Исходный базовый путь DFS. Здесь считываются метаданные Hudi. | | |targetDataPath| Целевой базовый путь DFS. Это необходимо для расчета fromCommitTime. Если fromCommitTime явно указано, этот параметр задавать не нужно. | | |tmpdb| База данных, используемая для создания промежуточных временных дельта-таблиц. | hoodie_temp | |fromCommitTime| Это самый важный параметр. Это момент времени, из которого извлекаются измененные записи. | | |maxCommits| Количество коммитов, включенных в запрос. Установка значения -1 будет включать все коммиты, начиная с fromCommitTime. Установка значения больше 0 будет включать записи, которые изменили только указанное количество коммитов после fromCommitTime. Возможно, вы захотите сделать это, если вам нужно выполнить два коммита одновременно. | 3 | |help| Справка по утилитам | | настраиватьfromCommitTime=0иmaxCommits=-1извлечет весь исходный набор данных,Может использоваться для запуска обратной засыпки. Если целевой набор данных является набором данных Hudi,затем утилита сможет определить, не был ли целевой набор данных отправлен или задерживается более чем на 24 часа (это возможно),Он будет автоматически использовать конфигурацию Backfill.,Потому что постепенное применение изменений за последние 24 часа займет больше времени, чем обратное заполнение. Текущим ограничением инструмента является отсутствие поддержки самостоятельного объединения одной и той же таблицы в смешанных режимах (обычном и инкрементном).
Инструкции по запросам Hive, выполняемым с помощью задач Fetch:потому чтоFetchЗадача вызывается для каждого разделаInputFormat.listStatus(),Каждый вызов listStatus() перечисляет метаданные Hoodie. Чтобы избежать этой ситуации,Следующее может быть полезно,Это использование Hive session属性对увеличиватьколичество Запрос ЗапрещатьFetchЗадача:set hive.fetch.task.conversion = none
;。Это обеспечитHiveЗапросиспользоватьMap Сокращение исполнения, Объедините разделы (разделенные запятыми) и вызовите InputFormat.listStatus() только один раз для всех из них.
Spark позволяет легко развертывать и управлять банками и пакетами Hudi в заданиях/блокнотах. Короче говоря, есть два способа получить доступ к наборам данных Hudi через Spark.
Как правило, ваше задание искры должно зависеть от hudi-spark или hudi-spark-bundle-x.y.z.jar, который должен находиться в пути к классам драйвера и исполнителя (совет: используйте параметр --jars).
Читать оптимизированную таблицу {#spark-ro-view}
Чтобы прочитать таблицу RO в таблицу Hive с помощью SparkSQL, просто вставьте фильтр пути в sparkContext, как показано ниже. Для таблиц Hudi этот метод сохраняет встроенные оптимизации Spark для чтения файлов Parquet, например векторизованное чтение.
spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]);
Если вы хотите использовать глобальный путь в DFS через источник данных, вы можете просто сделать что-то вроде следующего, чтобы получить кадр данных Spark.
Dataset<Row> hoodieROViewDF = spark.read().format("org.apache.hudi")
// pass any path glob, can include hudi & non-hudi datasets
.load("/glob/path/pattern");
Таблица реального времени {#spark-rt-view}
текущий,Таблицу реального времени можно запросить только в Spark как таблицу Hive. чтобы сделать это,настраиватьspark.sql.hive.convertMetastoreParquet = false
, Заставить Spark вернуться к использованию Hive Serde читает данные (планирование/исполнение по-прежнему Spark).
$ spark-shell --jars hudi-spark-bundle-x.y.z-SNAPSHOT.jar --driver-class-path /etc/hive/conf --packages com.databricks:spark-avro_2.11:4.0.0 --conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 7g --executor-memory 2g --master yarn-client
scala> sqlContext.sql("select count(*) from hudi_rt where datestr = '2016-10-02'").show()
Инкрементальное притяжение {#spark-incr-pull}
hudi-spark
Модуль обеспечиваетDataSource API — более элегантный способ извлечения данных из наборов данных Hudi и их обработки через Spark. Ниже показан пример постепенного притяжение,Он будет получен изbeginInstantTime
списатьизвсе记录。
Dataset<Row> hoodieIncViewDF = spark.read()
.format("org.apache.hudi")
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(),
DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(),
<beginInstantTime>)
.load(tablePath); // For incremental view, pass in the root/base path of dataset
См. раздел «Настройки», чтобы просмотреть все параметры источника данных.
Кроме того, HoodieReadClient предоставляет следующие функции посредством неявной индексации Hudi.
| API | описывать | | read(keys) | Используйте собственный индекс Hudi для считывания данных, соответствующих ключу, в виде DataFrame с быстрым поиском. | | filterExists() | Отфильтровать существующие записи из предоставленного RDD[HoodieRecord]. Полезно для удаления повторяющихся данных. | | checkExists(keys) | Проверяет, существует ли предоставленный ключ в наборе данных Hudi. |
Presto — это широко используемый механизм запросов, обеспечивающий интерактивную производительность запросов. Худи ROТаблица может находиться вPrestoсерединабесшовный Запрос。Это требует, чтобыhudi-presto-bundle jarвставить<presto_install>/plugin/hive-hadoop2/середина
。
Если вы хотите быстро извлечь данные в HDFS или облачное хранилище, Hudi может вам помочь. Кроме того, если ваши задания ETL/hive/spark выполняются медленно или ресурсоемко, Hudi может помочь, предоставив способ постепенного чтения и записи данных.
как организация,Hudi может помочь вам создать эффективное озеро данных,Решите некоторые из наиболее сложных проблем низкого уровня, связанных с Управлением хранилищем.,при этом быстрее доставляя данные аналитикам данных,Инженеры и ученые.
Hudi не предназначен для каких-либо случаев OLTP, в этих случаях обычно вы используете существующее хранилище данных NoSQL/RDBMS. Hudi не может заменить вашу аналитическую базу данных в памяти (по крайней мере, пока!). Hudi поддерживает прием практически в реальном времени за считанные минуты, компенсируя задержку для эффективной пакетной обработки. Если вам действительно нужна задержка обработки менее минуты, используйте свое любимое решение для обработки потоков.
Поэтапная обработка была впервые представлена Винотом Чандаром в блоге O'reilly, где объясняется большая часть работы. С чисто технической точки зрения, инкрементная обработка просто означает написание мини-пакетных программ в потоковом режиме. Типичное пакетное задание потребляет все входные данные и пересчитывает все выходные данные каждые несколько часов. Типичное задание потоковой обработки постоянно/каждые несколько секунд потребляет новые входные данные и пересчитывает новые/измененные выходные данные. Хотя, возможно, было бы проще пересчитать все выходные данные в пакетном режиме, это расточительно и дорого. Hudi имеет возможность писать один и тот же пакетный конвейер в потоковом режиме, запускающемся каждые несколько минут.
Хотя это можно назвать потоковой обработкой, мы предпочитаем называть ее инкрементальной обработкой, чтобы отличить ее от конвейеров чисто потоковой обработки, созданных с помощью Apache Flink, Apache Apex или Apache Kafka Streams.
Копировать при записи (Копировать On Напишите): Этот тип Хранилище позволяет клиентам принимать данные в столбчатом формате файла (в настоящее время — паркет). Используйте коровий тип хранилищачас,Любые новые данные, записанные в набор данных Hudi, будут записаны в новый файл паркета. Обновление существующих строк приведет к перезаписи всего файла паркета, содержащего затронутые строки. поэтому,Все записи в такие наборы данных ограничены производительностью записи в паркет.,Чем больше файл паркета,Чем больше времени требуется для приема данных.
Объединить во время чтения (Объединить On Читать): Этот тип Хранилище позволяет клиентам быстро принимать данные в форматы данных на основе строк (например, avro). Используйте МОРтип хранилищачас,Любые новые данные, записанные в набор данных Hudi, будут записаны в новый файл журнала/дельта.,Эти файлы внутренне кодируют данные в формате avro. Процесс сжатия (компрессия) (встроенная или асинхронная конфигурация) преобразует формат файла журнала в столбчатый формат файла (паркет).
Два разныхиз格式提供了Два разныхвид(Читать вид оптимизациии Вид в реальном времени),Оптимизация чтения зависит от производительности чтения файлов столбчатого паркета.,И Вид в реальном времени Зависит от типа столбцаи/илифайл журналаиз Чтение производительности。
Обновление существующей строки приведет к: а) записи обновления файла журнала/дельта из ранее созданного базового файла паркета посредством сжатия или б) записи обновления файла журнала/дельта без обновления сжатия; Поэтому все записи в такие наборы данных ограничены производительностью записи файла avro/log, которая намного быстрее, чем у паркета (требует копирования при записи). Однако чтение файлов журнала/дельта обходится дороже (требует слияния при чтении) по сравнению со столбчатыми (паркетными) файлами.
Основная цель Hudi — обеспечить функциональность обновления, которая на несколько порядков быстрее, чем перезапись всей таблицы или раздела. Выбирайте хранилище с копированием при записи (COW), если соблюдаются следующие условия:
Выберите хранилище слиянием при чтении (MOR), если выполняются следующие условия:
Независимо от того, какое хранилище вы выберете, Hudi предоставит:
Типичная база данных имеет несколько долго работающих серверов, которые предоставляют услуги чтения и записи. Архитектура Hudi отличается от других. Она сильно отделена от чтения и записи и может независимо расширять запись и запросы/чтение, чтобы справиться с проблемами расширения. Так что это не всегда может быть похоже на базу данных.
Тем не менее, Hudi очень похож на базу данных и обеспечивает аналогичную функциональность (обновления, фиксация изменений) и семантику (транзакционная запись, чтение с изолированными снимками).
При записи данных в Hudi записи можно моделировать так же, как в хранилище значений ключа: укажите ключевое поле (уникальное для одного раздела/всего набора данных), поле раздела (представляющее раздел, в котором должен быть помещен ключ), и логика preCombine/combine (используется для указания способа обработки повторяющихся записей в пакете записанных записей). Эта модель позволяет Hudi применять ограничения первичного ключа, как и в таблицах базы данных. Посмотрите пример здесь.
При запросе/чтении данных Hudi просто отображает себя в виде иерархической таблицы в формате JSON, и все привыкли использовать Hive/Spark/Presto для запроса Parquet/Json/Avro.
В общем, Hudi может предоставить эту функциональность в любой реализации файловой системы Hadoop, поэтому наборы данных можно читать и записывать в Cloud Store (Amazon S3, Microsoft Azure или Google Cloud Storage). Hudi также был специально разработан для упрощения создания наборов данных Hudi в облаке, таких как проверки согласованности для S3, отсутствие необходимости перемещения/переименования файлов данных.
Начиная с сентября 2019 года Hudi может поддерживать Spark 2.1+, Hive 2.x, Hadoop 2.7+ (не Hadoop 3).
На высоком уровне Hudi основан на конструкции MVCC, записывая данные в паркетный/базовый файл, а также в различные версии файла журнала, содержащего изменения, внесенные в базовый файл. Все файлы хранятся в секционированной схеме набора данных, которая очень похожа на то, как таблицы Apache Hive располагаются в DFS.
Обычно вы получаете частичные обновления/вставки из источника, а затем выполняете запись в набор данных Hudi. DeltaStreamer полезен, если вы извлекаете данные из других стандартных источников, таких как Kafka или Tailf DFS, предоставляя простое самостоятельно управляемое решение для записи данных в Hudi. Вы также можете написать собственный код, чтобы использовать API источника данных Spark для получения данных из пользовательского источника, а также использовать источник данных Hudi для записи в Hudi.
Преимущество записи в Hudi заключается в том, что она выполняется так же, как и любое другое задание Spark, выполняемое в YARN/Mesos или даже в кластере K8S. Просто используйте пользовательский интерфейс Spark для просмотра операций записи без настройки отдельного кластера Hudi.
Если синхронизация Hive не включена, наборы данных, записанные в Hudi с помощью вышеуказанных методов, можно просто запросить из источника данных Spark, как и из любого другого источника.
val hoodieROView =spark.read.format("org.apache.hudi").load(basePath +"/path/to/partitions/*")
val hoodieIncViewDF =spark.read().format("org.apache.hudi")
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(), DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), <beginInstantTime>)
.load(basePath);
пожалуйста, обрати внимание:текущий НетподдерживатьотSparkЧтение источника данных Вид в реальном времени。请использовать下面изHiveпуть。
Если синхронизация Hive включена в инструменте deltastreamer или источнике данных, набор данных синхронизируется с несколькими таблицами в Hive и может быть прочитан с помощью HiveQL, Presto или SparkSQL. Нажмите здесь, чтобы увидеть больше.
При выполнении операции обновления набора данных предоставленные записи содержат несколько записей для данного ключа, а затем все записи объединяются в одно конечное значение путем многократного вызова метода preCombine класса полезных данных. По умолчанию выбирается запись с наибольшим значением (определяемым функцией CompareTo).
для вставить или Операция Bulk_insert не выполнена preCombine。поэтому,Если ваш ввод содержит дубликаты,тогда набор данных также будет содержать дубликаты. Если вы не хотите дублировать записи,Пожалуйста, используйте upsert или укажите элемент Конфигурация в источнике данных или дельтастримере, чтобы удалить повторяющиеся данные.
Как и выше, определите методы, определенные классом полезных данных (combineAndGetUpdateValue(), getInsertValue()), которые управляют тем, как сохраненная запись объединяется с входными обновлениями/вставками для получения окончательного значения, которое будет записано обратно в хранилище.
GDPR делает удаление важным инструментом в наборе инструментов управления данными. Hudi поддерживает мягкое и принудительное удаление.
Hudi предоставляет встроенную поддержку миграции с помощью Предоставлено hudi-cli Инструмент HDFSParquetImporter сразу записывает весь набор данных в Hudi. Также можно Использовать API источника данных Spark считывает и записывает наборы данных. После миграции Можно использовать Обсуждаемые здесь традиционные методы выполнения операций записи. Здесь также подробно обсуждается этот вопрос, включая методы частичной миграции.
Сюда входят элементы конфигурации источников данных и клиентов записи Hudi (как дельтастример, так и источники данных вызываются внутри). В таком инструменте, как DeltaStreamer, вызовите --help распечатывает все использованные параметры управления. Upsert, параметры изменения размера файла определяются на уровне клиента, вот как они передаются в элемент Конфигурация, который можно использовать для записи данных.
1). дляSpark DataSource, вы можете использовать DataFrameWriter options APIдоставить эти Конфигурацияэлемент。
inputDF.write().format("org.apache.hudi")
.options(clientOpts)// any of the Hudi client opts can be passed in as well
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),"_row_key")
...
2) При непосредственном использовании HoodieWriteClient просто используйте Конфигурацию для создания объекта HoodieWriteConfig.
3). При использовании инструмента HoodieDeltaStreamer для извлечения вы можете установить элемент Конфигурация в файле свойств и использовать файл в качестве параметра командной строки. --реквизит пройден.
Да, это можно сделать с помощью автономного инструмента Hive Sync или с помощью инструмента deltastreamer или параметров в источнике данных.
Индекс является ключевой частью написания Hudi, он всегда сопоставляет заданный ключ записи с группой файлов (FileGroup) внутри Hudi. Это позволяет быстрее идентифицировать файловые группы, на которые влияет данная операция записи.
Hudi поддерживает следующие индексы.
Вы также можете настроить индекс, вам необходимо реализовать класс HoodieIndex и ввести имя класса индекса в Конфигурация.
Hudi Очиститель (очиститель) обычно в совершить и Запустите сразу после deltacommit, чтобы удалить старые файлы, которые больше не нужны. Если вы используете постепенное функции притяжения, убедитесь, что Конфигурация очищает элементы, чтобы сохранить достаточное количество коммитов, чтобы их можно было откатить. Еще одно соображение — предоставить длительным заданиям достаточно времени для завершения их выполнения. В противном случае Очиститель может удалить файлы, которые читается или могут быть прочитаны заданием, и привести к сбою задания. Обычно конфигурация по умолчанию, равная 10, позволяет выполнять извлечение каждые 30 минут, сохраняя до 5 (10 * 0,5) часов данных. Вы можете увеличить hoodie.cleaner.commits.retained
Конфигурацияэлементизценить。
Худи использует Avro
для протоколаизвнутреннее представительство,Это в основномпотому чтоэто хорошоиз Совместимость архитектурыи Характеристики эволюции。Это тоже приемилиETLТрубопроводы остаются надежнымииз Ключ。Просто перейдите кHudiизмодель(Будь то вDeltaStreamer
Показать предоставлено или авторомSparkDatasource
изDataset
неявный режим)обратная совместимость(Например Нетудалитьлюбое поле,Добавляйте только новые поля),Hudi将бесшовный处理新旧данныеизизчитать/операция записывает и будет поддерживать схему Hive в актуальном состоянии.
Самый простой способ выполнить сжатие набора данных MOR — запустить сжатие в реальном времени, но это занимает больше времени. Обычно это может быть особенно полезно, когда в старые разделы попадает небольшое количество просроченных данных, и в этом случае вам может потребоваться сжать последние N разделов, ожидая, пока старые разделы накопит достаточно журналов. В конечном итоге большая часть последних данных преобразуется в формат столбцов, оптимизированный для запросов, то есть из файлов журналов в файлы паркета.
Сжатие также можно выполнять асинхронно, что можно выполнить как отдельную задачу сжатия. Если вы используете DeltaStreamer, вы можете выполнять сжатие в непрерывном режиме, при котором прием и сжатие происходят одновременно в рамках одной задачи Spark.
Скорость записи в Hudi варьируется между операциями записи и изменением размера файла. Точно так же, как база данных требует дополнительных затрат на ввод-вывод в качестве прямого/необработанного файла на диске, Hudi может нести накладные расходы по сравнению с такими функциями, как чтение/запись необработанных файлов DFS или резервное копирование баз данных. Худи использует методы из литературы по базам данных, чтобы минимизировать эти накладные расходы, как подробно описано в таблице ниже.
Как и многие системы, которые управляют данными временных рядов,Если ключ имеет префикс временной метки или монотонно увеличивается/уменьшается,Тогда выступление Худи будет лучше,И мы почти всегда этого добиваемся. Даже ключи UUID,Вы также можете следовать следующим советам, чтобы получить упорядоченные ключи. Дополнительные советы по JVM и другой конфигурации см. в Руководстве по настройке.
Ключевой дизайн Hudi заключается в том, чтобы избегать создания небольших файлов и всегда записывать файлы соответствующего размера, которые тратят больше времени на прием/запись, чтобы обеспечить эффективность запросов. Метод записи очень маленьких файлов с последующим объединением решает только проблему масштабируемости системы, вызванную маленькими файлами, что в любом случае замедлит скорость запросов из-за маленьких файлов.
осуществлятьвставить Операция обновления/вставки, Hudi может настроить размер файла. (Примечание. Операция Bulk_insert не предоставляет эту функциональность и предназначена для использования вместо нее. spark.write.parquet
。)
для При написаниикопировать,Возможна настройка максимального размера базового/паркетного файла и мягких ограничений.,Файлы размером меньше указанного лимита являются небольшими файлами. Худи попытается добавить достаточно записей в небольшой файл во время записи.,так, чтобы он достиг максимального предела Конфигурация. Например,для compactionSmallFileSize=100MB
и limitFileSize=120MB
,Hudi выберет все файлы размером менее 100 МБ.,и попытался увеличить его до 120 МБ.
дляобъединить при чтении,Других Конфигураций почти нет. Возможна настройка максимального размера журнала и коэффициента,Этот коэффициент представляет собой степень уменьшения размера при преобразовании данных из файлов avro в файлы паркета.
HUDI-26
будет меньшеизфайловая группаслить成较大изфайловая группа,Тем самым улучшая производительность.
Hudi поддерживает запись неразделенных наборов данных. Если вы хотите записать несекционированный набор данных Hudi и выполнить синхронизацию таблицы единиц конфигурации, вам необходимо установить следующую Конфигурацию в переданных свойствах:
hoodie.datasource.write.keygenerator.class=org.apache.hudi.NonpartitionedKeyGenerator
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
Механизмы, не относящиеся к Hive, обычно сами пересчитывают файлы в DFS для запроса набора данных. Например, Spark считывает пути непосредственно из файловой системы (HDFS или S3).
Искра вызывается следующим образом:
Не понимая структуру файлов Худи, движок просто прочитает все файлы паркета и отобразит результаты, которые могут иметь большое количество дубликатов в результатах.
Существует два способа запроса конфигурации для правильного чтения набора данных Hudi.
A) вызовHoodieParquetInputFormat#getSplits
иHoodieParquetInputFormat#getRecordReader
метод
Б) Создайте фильтр пути вызова механизма или другие методы для прямого вызова класса Hudi для фильтрации файлов в DFS и выбора последних фрагментов файлов.
Часть этих данных можно импортировать в новую таблицу hudi пакетно. Например, данные за один месяц
spark.read.parquet("your_data_set/path/to/month")
.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "bulk_insert")
.option("hoodie.datasource.write.storage.type", "storage_type") // COPY_ON_WRITE or MERGE_ON_READ
.option(RECORDKEY_FIELD_OPT_KEY, "<your key>").
.option(PARTITIONPATH_FIELD_OPT_KEY, "<your_partition>")
...
.mode(SaveMode.Append)
.save(basePath);
Получив исходную копию, вы можете выбрать несколько образцов данных для операций обновления и вставки.
spark.read.parquet("your_data_set/path/to/month").limit(n) // Limit n records
.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.option(RECORDKEY_FIELD_OPT_KEY, "<your key>").
.option(PARTITIONPATH_FIELD_OPT_KEY, "<your_partition>")
...
.mode(SaveMode.Append)
.save(basePath);
Объединение таблиц, читаемых для, требует планирования и выполнения задач сжатия. Можно использовать spark отправитьотправить напрямую org.apache.hudi.utilities.HoodieCompactor выполняет сжатие и также может использовать HUDI. CLI выполняет сжатие.